Learning to walk using Reinforcement Learning

14 min read Original article ↗

Antonio Lisi

Teaching a robot to walk

Press enter or click to view image in full size

By Antonio Lisi
To my grandma, I’ll miss you

Intro

Hello everyone, in this post, we’re going to teach a robot to walk using one of the latest state-of-the-art algorithms, called SAC.

As always, we implement everything from scratch using Tensorflow 2. We’ll use a lot of the code developed in the post about DDPG. So if you didn’t read it, I recommend doing it before going forward.

Environment

In the DDPG post, we solved two environments provided by OpenAI. But they were too easy, I wanted to try more challenging continuous action spaces environments.

One of the most popular packages is MuJoCo. MuJoCo stands for Multi-Joint dynamics with Contact and citing from the site “is a physics engine aiming to facilitate research and development in robotics, biomechanics, graphics and animation, and other areas where fast and accurate simulation is needed. It offers a unique combination of speed, accuracy and modeling power, yet it is not merely a better simulator”. Unfortunately, MuJoCo isn’t free, it requires a license.

After googling around, I found an open-source alternative called PyBullet that is very similar to MuJoco. You can install it by running pip install pybullet, and you can find the list of the available environments here.

From all the environments available on pybullet I decided to solve the “Humanoid” where the goal is for a humanoid to learn to walk. Humanoid and other environments are ported the Roboschool environments to pybullet. The Roboschool environments are harder than the MuJoCo Gym environments. In particular, from the documentation “Humanoid benefits from more realistic energy cost (= torque × angular velocity) subtracted from the reward.”

Let’s see how a Random Agent perform in this case:

He just continues to fall…

Ok, we have a lot to improve. Let’s start talking about the algorithm that we’re going to use to teach this robot how to walk.

Soft Actor Critic (SAC)

Introduced in the paper Soft Actor-Critic: Off-Policy Maximum Entropy Deep Reinforcement Learning by Tuomas Taarnoja et al., it is considered one of the best algorithms to solve continuous action spaces environments.

SAC combines off-policy updates with a stable stochastic actor-critic formulation. It forms a bridge between stochastic policy optimization and DDPG-style approaches.

SAC simultaneously tries to maximize the expected return and the entropy of the policy using the same intuition that we saw in PPO (you can find the post here). The main difference is that in SAC, we’re trying to maximize the entropy, while in PPO, we used the entropy as a regularizer. But the goal is the same, we want to encourage exploration and penalize policy with low entropy.

From the original paper we can see what the SAC will maximize:

The first part is the expected return, while the second part is the entropy of the policy. The α parameter is the temperature and determines the relative importance of the entropy term against the expected return. As stated in the paper, the temperature can always be subsumed into the reward by scaling it by α−1.

Also, from the paper, the maximum entropy reinforcement learning can make use
of Q-functions and value functions:

SAC makes use of three types of networks to approximate:

  • the state value function V
  • the soft Q-function Q
  • the policy function π

As you can see in the above formulas, the V and Q functions are related through the policy. So in principle, we could derive one from the other, and we don’t need two separate approximations. But the authors say that in practice having separate function approximators helps in convergence.

The SAC algorithms also use the clipped double-Q trick, introduced in Addressing Function Approximation Error in Actor-Critic Methods by Fujimoto et. al.. So there will be two networks to predict the Q-values, and we’ll take the minimum of the two predictions. This helps with dealing with Q-value overestimation during the training.

So in total, we’ll have 5 networks:

  • the Actor that defines the policy
  • two Critic Q-values
  • the Critic Value
  • the Target Critic Value

The target critic value is used as we saw in the DDPG post. It is a time-delayed copy of the original network that slowly updates its weights to improve learning stability.

The Q-networks are trained using the MSE objective by doing Bellman approximation using the target value network:

The V-network is trained using the MSE objective with the following target:

The actor is trained to maximize the expected future return plus expected future entropy:

We’ll use here the reparameterization trick to make the randomness an input of the network. This will be clearer in the code.

Replay Buffer

As always we start with the Replay Buffer:

class ReplayBuffer():
def __init__(self, env, buffer_capacity=BUFFER_CAPACITY, batch_size=BATCH_SIZE, min_size_buffer=MIN_SIZE_BUFFER):
self.buffer_capacity = buffer_capacity
self.batch_size = batch_size
self.min_size_buffer = min_size_buffer
self.buffer_counter = 0
self.n_games = 0

self.states = np.zeros((self.buffer_capacity, env.observation_space.shape[0]))
self.actions = np.zeros((self.buffer_capacity, env.action_space.shape[0]))
self.rewards = np.zeros((self.buffer_capacity))
self.next_states = np.zeros((self.buffer_capacity, env.observation_space.shape[0]))
self.dones = np.zeros((self.buffer_capacity), dtype=bool)

def __len__(self):
return self.buffer_counter
def add_record(self, state, action, reward, next_state, done):
# Set index to zero if counter = buffer_capacity and start again (1 % 100 = 1 and 101 % 100 = 1) so we substitute the older entries
index = self.buffer_counter % self.buffer_capacity
self.states[index] = state
self.actions[index] = action
self.rewards[index] = reward
self.next_states[index] = next_state
self.dones[index] = done

# Update the counter when record something
self.buffer_counter += 1

def check_buffer_size(self):
return self.buffer_counter >= self.batch_size and self.buffer_counter >= self.min_size_buffer

def update_n_games(self):
self.n_games += 1

def get_minibatch(self):
# If the counter is less than the capacity we don't want to take zeros records,
# if the cunter is higher we don't access the record using the counter
# because older records are deleted to make space for new one
buffer_range = min(self.buffer_counter, self.buffer_capacity)

batch_index = np.random.choice(buffer_range, self.batch_size, replace=False)
# Convert to tensors
state = self.states[batch_index]
action = self.actions[batch_index]
reward = self.rewards[batch_index]
next_state = self.next_states[batch_index]
done = self.dones[batch_index]

return state, action, reward, next_state, done

def save(self, folder_name):
"""
Save the replay buffer
"""
if not os.path.isdir(folder_name):
os.mkdir(folder_name)
np.save(folder_name + '/states.npy', self.states)
np.save(folder_name + '/actions.npy', self.actions)
np.save(folder_name + '/rewards.npy', self.rewards)
np.save(folder_name + '/next_states.npy', self.next_states)
np.save(folder_name + '/dones.npy', self.dones)

dict_info = {"buffer_counter": self.buffer_counter, "n_games": self.n_games}

with open(folder_name + '/dict_info.json', 'w') as f:
json.dump(dict_info, f)
def load(self, folder_name):
"""
Load the replay buffer
"""
self.states = np.load(folder_name + '/states.npy')
self.actions = np.load(folder_name + '/actions.npy')
self.rewards = np.load(folder_name + '/rewards.npy')
self.next_states = np.load(folder_name + '/next_states.npy')
self.dones = np.load(folder_name + '/dones.npy')

with open(folder_name + '/dict_info.json', 'r') as f:
dict_info = json.load(f)
self.buffer_counter = dict_info["buffer_counter"]
self.n_games = dict_info["n_games"]

It’s basically the same code from the DDPG article. We store all the states, actions, rewards, next_states, and terminal flags derived from the interaction with the environment calling the function add_record. We get a random minibatch using the get_minibatch method, and we can save and load the entire replay buffer with the save and load methods.

Networks

As said before, we need to define three types of networks that approximate the state value function V, the soft Q-function, and the policy function π. So we need two types of Critics and one Actor.

Critic Q Value

class Critic(tf.keras.Model):
def __init__(self, name, hidden_0=CRITIC_HIDDEN_0, hidden_1=CRITIC_HIDDEN_1):
super(Critic, self).__init__()
self.hidden_0 = hidden_0
self.hidden_1 = hidden_1
self.net_name = name
self.dense_0 = Dense(self.hidden_0, activation='relu')
self.dense_1 = Dense(self.hidden_1, activation='relu')
self.q_value = Dense(1, activation=None)
def call(self, state, action):
state_action_value = self.dense_0(tf.concat([state, action], axis=1))
state_action_value = self.dense_1(state_action_value)
q_value = self.q_value(state_action_value)
return q_value

Nothing new here, it’s basically the same used in the DDPG article.

Critic Value

class CriticValue(tf.keras.Model):
def __init__(self, name, hidden_0=CRITIC_HIDDEN_0, hidden_1=CRITIC_HIDDEN_1):
super(CriticValue, self).__init__()
self.hidden_0 = hidden_0
self.hidden_1 = hidden_1
self.net_name = name

self.dense_0 = Dense(self.hidden_0, activation='relu')
self.dense_1 = Dense(self.hidden_1, activation='relu')
self.value = Dense(1, activation=None)
def call(self, state):
value = self.dense_0(state)
value = self.dense_1(value)
value = self.value(value)
return value

We can see that we don’t need the actions as input to approximate the State Value, but just the state (the state value function maps a state to its expected reward).

Actor

class Actor(tf.keras.Model):
def __init__(self, name, upper_bound, actions_dim, hidden_0=CRITIC_HIDDEN_0, hidden_1=CRITIC_HIDDEN_1, epsilon=EPSILON, log_std_min=LOG_STD_MIN, log_std_max=LOG_STD_MAX):
super(Actor, self).__init__()
self.hidden_0 = hidden_0
self.hidden_1 = hidden_1
self.actions_dim = actions_dim
self.net_name = name
self.upper_bound = upper_bound
self.epsilon = epsilon
self.log_std_min = log_std_min
self.log_std_max = log_std_max
self.dense_0 = Dense(self.hidden_0, activation='relu')
self.dense_1 = Dense(self.hidden_1, activation='relu')
self.mean = Dense(self.actions_dim, activation=None)
self.log_std = Dense(self.actions_dim, activation=None)
def call(self, state):
policy = self.dense_0(state)
policy = self.dense_1(policy)
mean = self.mean(policy)
log_std = self.log_std(policy)
log_std = tf.clip_by_value(log_std, self.log_std_min, self.log_std_max)
return mean, log_std
def get_action_log_probs(self, state, reparameterization_trick=True):
mean, log_std = self.call(state)
std = tf.exp(log_std)
normal_distr = tfp.distributions.Normal(mean, std)
# Reparameterization trick
z = tf.random.normal(shape=mean.shape, mean=0., stddev=1.)
if reparameterization_trick:
actions = mean + std * z
else:
actions = normal_distr.sample()
action = tf.math.tanh(actions) * self.upper_bound
log_probs = normal_distr.log_prob(actions) - tf.math.log(1 - tf.math.pow(action,2) + self.epsilon)
log_probs = tf.math.reduce_sum(log_probs, axis=1, keepdims=True)
return action, log_probs

The Actor network returns two outputs, the mean and the log standard deviation. We use the log standard deviations because the exponential always gives a positive number. The log standard deviation is also clipped to an interval to avoid extreme values that could create problems at the beginning of the training.

Note that in the get_action_log_probs method, we activate the reparameterization trick using a flag. When true, we sample some noise from a Standard Normal distribution, multiply it with the standard deviation (exponential value of the log that the Actor returns) and add the result to the mean. When False, we just use a Normal distribution with mean and standard deviation equal to the values returned by the Network.

The concept of the reparameterization trick was introduced in Auto-Encoding Variational Bayes by Kingma et al., and we need it to backpropagate through a random node. If you want to go deeper on this concept, I found a great explanation here that clarified the concept for me. But why we’re using it here? Because it has lower variance, and it helps a faster convergence of the algorithm.

Agent

We can now look at the “brain” of the Agent:

class Agent:
def __init__(self, env, path_save=PATH_SAVE, path_load=PATH_LOAD, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR, gamma=GAMMA, tau=TAU, reward_scale=REWARD_SCALE):
self.gamma = gamma
self.tau = tau
self.replay_buffer = ReplayBuffer(env)
self.actions_dim = env.action_space.shape[0]
self.upper_bound = env.action_space.high[0]
self.lower_bound = env.action_space.low[0]
self.actor_lr = actor_lr
self.critic_lr = critic_lr
self.path_save = path_save
self.path_load = path_load
self.actor = Actor(actions_dim=self.actions_dim, name='actor', upper_bound=env.action_space.high)
self.critic_0 = Critic(name='critic_0')
self.critic_1 = Critic(name='critic_1')
self.critic_value = CriticValue(name='value')
self.critic_target_value = CriticValue(name='target_value')
self.actor.compile(optimizer=opt.Adam(learning_rate=self.actor_lr))
self.critic_0.compile(optimizer=opt.Adam(learning_rate=self.critic_lr))
self.critic_1.compile(optimizer=opt.Adam(learning_rate=self.critic_lr))
self.critic_value.compile(optimizer=opt.Adam(learning_rate=self.critic_lr))
self.critic_target_value.compile(optimizer=opt.Adam(learning_rate=self.critic_lr))
self.reward_scale = reward_scale self.critic_target_value.set_weights(self.critic_value.weights)

In the __init__ method, we define all the parameters from the config file that the Agent will use. We define the replay buffer and the 5 networks that we talked about. As always, we set the target network weights equal to the corresponding trained network.

We then have a series of utility methods that we also used in the DDPG code:

def add_to_replay_buffer(self, state, action, reward, new_state, done):
self.replay_buffer.add_record(state, action, reward, new_state, done)

def save(self):
date_now = time.strftime("%Y%m%d%H%M")
if not os.path.isdir(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}"):
os.makedirs(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}")
self.actor.save_weights(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}/{self.actor.net_name}.h5")
self.critic_0.save_weights(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}/{self.critic_0.net_name}.h5")
self.critic_1.save_weights(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}/{self.critic_1.net_name}.h5")
self.critic_value.save_weights(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}/{self.critic_value.net_name}.h5")
self.critic_target_value.save_weights(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}/{self.critic_target_value.net_name}.h5")

self.replay_buffer.save(f"{self.path_save}/save_agent_{ENV_NAME.lower()}_{date_now}")

def load(self):
self.actor.load_weights(f"{self.path_load}/{self.actor.net_name}.h5")
self.critic_0.load_weights(f"{self.path_load}/{self.critic_0.net_name}.h5")
self.critic_1.load_weights(f"{self.path_load}/{self.critic_1.net_name}.h5")
self.critic_value.load_weights(f"{self.path_load}/{self.critic_value.net_name}.h5")
self.critic_target_value.load_weights(f"{self.path_load}/{self.critic_target_value.net_name}.h5")

self.replay_buffer.load(f"{self.path_load}")

def get_action(self, observation):
state = tf.convert_to_tensor([observation])
actions, _ = self.actor.get_action_log_probs(state, reparameterization_trick=False)
return actions[0]

Nothing new here, let’s look at the learn method where things get interesting:

def learn(self):
if self.replay_buffer.check_buffer_size() == False:
return
state, action, reward, new_state, done = self.replay_buffer.get_minibatch() states = tf.convert_to_tensor(state, dtype=tf.float32)
new_states = tf.convert_to_tensor(new_state, dtype=tf.float32)
rewards = tf.convert_to_tensor(reward, dtype=tf.float32)
actions = tf.convert_to_tensor(action, dtype=tf.float32)
with tf.GradientTape() as tape:
value = tf.squeeze(self.critic_value(states), 1)
target_value = tf.squeeze(self.critic_target_value(new_states), 1)
policy_actions, log_probs = self.actor.get_action_log_probs(states, reparameterization_trick=False)
log_probs = tf.squeeze(log_probs,1)
q_value_0 = self.critic_0(states, policy_actions)
q_value_1 = self.critic_1(states, policy_actions)
q_value = tf.squeeze(tf.math.minimum(q_value_0, q_value_1), 1)
value_target = q_value - log_probs
value_critic_loss = 0.5 * tf.keras.losses.MSE(value, value_target)
value_critic_gradient = tape.gradient(value_critic_loss, self.critic_value.trainable_variables)
self.critic_value.optimizer.apply_gradients(zip(value_critic_gradient, self.critic_value.trainable_variables))
with tf.GradientTape() as tape:
new_policy_actions, log_probs = self.actor.get_action_log_probs(states, reparameterization_trick=True)
log_probs = tf.squeeze(log_probs, 1)
new_q_value_0 = self.critic_0(states, new_policy_actions)
new_q_value_1 = self.critic_1(states, new_policy_actions)
new_q_value = tf.squeeze(tf.math.minimum(new_q_value_0, new_q_value_1), 1)

actor_loss = log_probs - new_q_value
actor_loss = tf.math.reduce_mean(actor_loss)

actor_gradient = tape.gradient(actor_loss, self.actor.trainable_variables)
self.actor.optimizer.apply_gradients(zip(actor_gradient, self.actor.trainable_variables))
with tf.GradientTape(persistent=True) as tape:
q_pred = self.reward_scale * reward + self.gamma * target_value * (1-done)
old_q_value_0 = tf.squeeze(self.critic_0(state, action), 1)
old_q_value_1 = tf.squeeze(self.critic_1(state, action), 1)
critic_0_loss = 0.5 * tf.keras.losses.MSE(old_q_value_0, q_pred)
critic_1_loss = 0.5 * tf.keras.losses.MSE(old_q_value_1, q_pred)

critic_0_network_gradient = tape.gradient(critic_0_loss, self.critic_0.trainable_variables)
critic_1_network_gradient = tape.gradient(critic_1_loss, self.critic_1.trainable_variables)

self.critic_0.optimizer.apply_gradients(zip(critic_0_network_gradient, self.critic_0.trainable_variables))
self.critic_1.optimizer.apply_gradients(zip(critic_1_network_gradient, self.critic_1.trainable_variables))
self.update_target_networks(tau=self.tau)

self.replay_buffer.update_n_games()

We have four networks to train:

  • Critic Value
  • Actor
  • Critic Q-value 0
  • Critic Q-value 1

Critic Value

with tf.GradientTape() as tape:
value = tf.squeeze(self.critic_value(states), 1)
target_value = tf.squeeze(self.critic_target_value(new_states), 1)
policy_actions, log_probs = self.actor.get_action_log_probs(states, reparameterization_trick=False)
log_probs = tf.squeeze(log_probs,1)
q_value_0 = self.critic_0(states, policy_actions)
q_value_1 = self.critic_1(states, policy_actions)
q_value = tf.squeeze(tf.math.minimum(q_value_0, q_value_1), 1)
value_target = q_value - log_probs
value_critic_loss = tf.keras.losses.MSE(value, value_target)
value_critic_gradient = tape.gradient(value_critic_loss, self.critic_value.trainable_variables)
self.critic_value.optimizer.apply_gradients(zip(value_critic_gradient, self.critic_value.trainable_variables))

We take the prediction of the value network and its corresponding target network. Then we use the actor’s policy to have the q-value from the Q-value networks using the clipped double-Q trick that we talked about. We define the predicted/target value as the difference between the Q-value and the log probabilities. In the end, the loss is the mean squared error between the value estimated and the value that the value predicted by the value network.

Actor

with tf.GradientTape() as tape:
new_policy_actions, log_probs = self.actor.get_action_log_probs(states, reparameterization_trick=True)
log_probs = tf.squeeze(log_probs, 1)
new_q_value_0 = self.critic_0(states, new_policy_actions)
new_q_value_1 = self.critic_1(states, new_policy_actions)
new_q_value = tf.squeeze(tf.math.minimum(new_q_value_0, new_q_value_1), 1)

actor_loss = log_probs - new_q_value
actor_loss = tf.math.reduce_mean(actor_loss)

actor_gradient = tape.gradient(actor_loss, self.actor.trainable_variables)
self.actor.optimizer.apply_gradients(zip(actor_gradient, self.actor.trainable_variables))

As before, we get the action and log probabilities from the actor, the Q-value from the two Critics, and we define the loss as the difference between the log distribution and the Q-value. Notice here that we want to maximize Q-value — log(π), so we define the loss as the objective function multiplied by -1.

Critic Q-value

with tf.GradientTape(persistent=True) as tape:
q_pred = self.reward_scale * reward + self.gamma * target_value * (1-done)
old_q_value_0 = tf.squeeze(self.critic_0(state, action), 1)
old_q_value_1 = tf.squeeze(self.critic_1(state, action), 1)
critic_0_loss = tf.keras.losses.MSE(old_q_value_0, q_pred)
critic_1_loss = tf.keras.losses.MSE(old_q_value_1, q_pred)

critic_0_network_gradient = tape.gradient(critic_0_loss, self.critic_0.trainable_variables)
critic_1_network_gradient = tape.gradient(critic_1_loss, self.critic_1.trainable_variables)

self.critic_0.optimizer.apply_gradients(zip(critic_0_network_gradient, self.critic_0.trainable_variables))
self.critic_1.optimizer.apply_gradients(zip(critic_1_network_gradient, self.critic_1.trainable_variables))

We need to compute multiple gradients over the same computation for the Critic Q-value networks, so we use a persistent gradient tape. We define the predicted Q-value as the reward in the single episode plus the present value of the value predicted by the Target Critic Value Network. We can see here the reward scale that, as said before, is the inverse of the entropy temperature. The loss for the two networks is the mean squared error between this value and their predictions.

Training loop and results

We can now see the training loop and the results.

config = dict(
learning_rate_actor = ACTOR_LR,
learning_rate_critic = ACTOR_LR,
batch_size = BATCH_SIZE,
architecture = "SAC",
infra = "Colab",
env = ENV_NAME
)
wandb.init(
project=f"tensorflow2_sac_{ENV_NAME.lower()}",
tags=["SAC", "FCL", "RL"],
config=config,
)
env = gym.make(ENV_NAME)
agent = Agent(env)
scores = []
evaluation = True
if PATH_LOAD is not None:
print("loading weights")
observation = env.reset()
action, log_probs = agent.actor.get_action_log_probs(observation[None, :], False)
agent.actor(observation[None, :])
agent.critic_0(observation[None, :], action)
agent.critic_1(observation[None, :], action)
agent.critic_value(observation[None, :])
agent.critic_target_value(observation[None, :])
agent.load()
print(agent.replay_buffer.buffer_counter)
print(agent.replay_buffer.n_games)

We configure wandb for logging and we load any pre-trained agent.

In the end, we have the training loop:

for _ in tqdm(range(MAX_GAMES)):
start_time = time.time()
states = env.reset()
done = False
score = 0
while not done:
action = agent.get_action(states)
new_states, reward, done, info = env.step(action)
score += reward
agent.add_to_replay_buffer(states, action, reward, new_states, done)
agent.learn()
states = new_states

scores.append(score)
agent.replay_buffer.update_n_games()

wandb.log({'Game number': agent.replay_buffer.n_games, '# Episodes': agent.replay_buffer.buffer_counter,
"Average reward": round(np.mean(scores[-10:]), 2), \
"Time taken": round(time.time() - start_time, 2)})

if (_ + 1) % SAVE_FREQUENCY == 0:
print("saving...")
agent.save()
print("saved")

The training loop is always the same, we interact with the environment, save the states, actions, rewards, and terminal flags in the replay buffer, and we train the agent using the method learn() seen before. When the episode is over, we evaluate the results logging the mean of the last ten games, and we save the networks and replay buffer every SAVE_FREQUENCY (defined at 200 in the config file) times.

Looking at the results:

Press enter or click to view image in full size

As we can see, the average rewards start to increase after 1k interactions. This took a while, but we can see how the agent started learning to walk:

We never assign a score on the style…

Ok, the guy can stand and do some little walking, but as you can see, there’s much more to learn. But at this point is just training time.

You can find the original article on my blog and all the code on my Github. For any questions, you can reach me through Linkedin.

If you enjoyed this article, share it with your friends and colleagues! I’ll see you in the next post. In the meantime, take care, stay safe, and remember don’t be another brick in the wall.

Anton.ai