Chapter 16 – Reinforcement Learning
This notebook contains all the sample code and solutions to the exercices in chapter 16.
Note: there may be minor differences between the output of this notebook and the examples shown in the book. You can safely ignore these differences. They are mainly due to the fact that most of the environments provided by OpenAI gym have some randomness.
In this notebook we will be using OpenAI gym, a great toolkit for developing and comparing Reinforcement Learning algorithms. It provides many environments for your learning agents to interact with. Let's start by importing gym
import gym
Next we will load the MsPacman environment, version 0.
env = gym.make('MsPacman-v0')
Let's initialize the environment by calling is reset()
method. This returns an observation:
obs = env.reset()
Observations vary depending on the environment. In this case it is an RGB image represented as a 3D NumPy array of shape [width, height, channels] (with 3 channels: Red, Green and Blue). In other environments it may return different objects, as we will see later.
(210, 160, 3)
An environment can be visualized by calling its render()
method, and
you can pick the rendering mode (the rendering options depend on the
environment). In this example we will set mode="rgb_array"
to get an
image of the environment as a NumPy array:
img = env.render(mode="rgb_array")
Let's plot this image:
Welcome back to the 1980s! :)
In this environment, the rendered image is simply equal to the observation (but in many environments this is not the case):
(img == obs).all()
Let's create a little helper function to plot an environment:
def plot_environment(env, figsize=(5,4)):
plt.close() # or else nbagg sometimes plots in the previous cell
img = env.render(mode="rgb_array")
Let's see how to interact with an environment. Your agent will need to select an action from an "action space" (the set of possible actions). Let's see what this environment's action space looks like:
means that the possible actions are integers 0 through 8, which represents the 9 possible positions of the joystick (0=center, 1=up, 2=right, 3=left, 4=down, 5=upper-right, 6=upper-left, 7=lower-right, 8=lower-left).
Next we need to tell the environment which action to play, and it will compute the next step of the game. Let's go left for 110 steps, then lower left for 40 steps:
for step in range(110):
env.step(3) #left
for step in range(40):
env.step(8) #lower-left
Where are we now?
The step()
function actually returns several important objects:
obs, reward, done, info = env.step(0)
The observation tells the agent what the environment looks like, as discussed earlier. This is a 210x160 RGB image:
(210, 160, 3)
The environment also tells the agent how much reward it got during the last step:
When the game is over, the environment returns done=True
Finally, info
is an environment-specific dictionary that can provide some extra information about the internal state of the environment. This is useful for debugging, but your agent should not use this information for learning (it would be cheating).
Let's play one full game (with 3 lives), by moving in random directions for 10 steps at a time, recording each frame:
frames = []
n_max_steps = 1000
n_change_steps = 10
obs = env.reset()
for step in range(n_max_steps):
img = env.render(mode="rgb_array")
if step % n_change_steps == 0:
action = env.action_space.sample() # play randomly
obs, reward, done, info = env.step(action)
if done:
Now show the animation (it's a bit jittery within Jupyter):
def update_scene(num, frames, patch):
return patch,
def plot_animation(frames, repeat=False, interval=40):
plt.close() # or else nbagg sometimes plots in the previous cell
fig = plt.figure()
patch = plt.imshow(frames[0])
return animation.FuncAnimation(fig, update_scene, fargs=(frames, patch), frames=len(frames), repeat=repeat, interval=interval)
video = plot_animation(frames)
Once you have finished playing with an environment, you should close it to free up resources:
To code our first learning agent, we will be using a simpler environment: the Cart-Pole.
The Cart-Pole is a very simple environment composed of a cart that can move left or right, and pole placed vertically on top of it. The agent must move the cart left or right to keep the pole upright.
env = gym.make("CartPole-v0")
obs = env.reset()
array([-0.03625774, -0.01502138, 0.03556243, -0.03981458])
The observation is a 1D NumPy array composed of 4 floats: they represent the cart's horizontal position, its velocity, the angle of the pole (0 = vertical), and the angular velocity. Let's render the environment... unfortunately we need to fix an annoying rendering issue first.
Some environments (including the Cart-Pole) require access to your
display, which opens up a separate window, even if you specify the
mode. In general you can safely ignore that
window. However, if Jupyter is running on a headless server
(ie. without a screen) it will raise an exception. One way to avoid
this is to install a fake X server like Xvfb. You can start Jupyter
using the xvfb-run
If Jupyter is running on a headless server but you don't want to worry about Xvfb, then you can just use the following rendering function for the Cart-Pole:
from PIL import Image, ImageDraw
from import gl_info
openai_cart_pole_rendering = True # no problem, let's use OpenAI gym's rendering function
except Exception:
openai_cart_pole_rendering = False # probably no X server available, let's use our own rendering function
def render_cart_pole(env, obs):
if openai_cart_pole_rendering:
# use OpenAI gym's rendering function
return env.render(mode="rgb_array")
# rendering for the cart pole environment (in case OpenAI gym can't do it)
img_w = 600
img_h = 400
cart_w = img_w // 12
cart_h = img_h // 15
pole_len = img_h // 3.5
pole_w = img_w // 80 + 1
x_width = 2
max_ang = 0.2
bg_col = (255, 255, 255)
cart_col = 0x000000 # Blue Green Red
pole_col = 0x669acc # Blue Green Red
pos, vel, ang, ang_vel = obs
img ='RGB', (img_w, img_h), bg_col)
draw = ImageDraw.Draw(img)
cart_x = pos * img_w // x_width + img_w // x_width
cart_y = img_h * 95 // 100
top_pole_x = cart_x + pole_len * np.sin(ang)
top_pole_y = cart_y - cart_h // 2 - pole_len * np.cos(ang)
draw.line((0, cart_y, img_w, cart_y), fill=0)
draw.rectangle((cart_x - cart_w // 2, cart_y - cart_h // 2, cart_x + cart_w // 2, cart_y + cart_h // 2), fill=cart_col) # draw cart
draw.line((cart_x, cart_y - cart_h // 2, top_pole_x, top_pole_y), fill=pole_col, width=pole_w) # draw pole
return np.array(img)
def plot_cart_pole(env, obs):
plt.close() # or else nbagg sometimes plots in the previous cell
img = render_cart_pole(env, obs)
plot_cart_pole(env, obs)
Now let's look at the action space:
Yep, just two possible actions: accelerate towards the left or towards the right. Let's push the cart left until the pole falls:
obs = env.reset()
while True:
obs, reward, done, info = env.step(0)
if done:
plt.close() # or else nbagg sometimes plots in the previous cell
img = render_cart_pole(env, obs)
Notice that the game is over when the pole tilts too much, not when it actually falls. Now let's reset the environment and push the cart to right instead:
obs = env.reset()
while True:
obs, reward, done, info = env.step(1)
if done:
plot_cart_pole(env, obs)
Looks like it's doing what we're telling it to do. Now how can we make the poll remain upright? We will need to define a policy for that. This is the strategy that the agent will use to select an action at each step. It can use all the past actions and observations to decide what to do.
Let's hard code a simple strategy: if the pole is tilting to the left, then push the cart to the left, and vice versa. Let's see if that works:
frames = []
n_max_steps = 1000
n_change_steps = 10
obs = env.reset()
for step in range(n_max_steps):
img = render_cart_pole(env, obs)
# hard-coded policy
position, velocity, angle, angular_velocity = obs
if angle < 0:
action = 0
action = 1
obs, reward, done, info = env.step(action)
if done:
video = plot_animation(frames)
Nope, the system is unstable and after just a few wobbles, the pole ends up too tilted: game over. We will need to be smarter than that!
Let's create a neural network that will take observations as inputs, and output the action to take for each observation. To choose an action, the network will first estimate a probability for each action, then select an action randomly according to the estimated probabilities. In the case of the Cart-Pole environment, there are just two possible actions (left or right), so we only need one output neuron: it will output the probability p
of the action 0 (left), and of course the probability of action 1 (right) will be 1 - p
Note: instead of using the fully_connected()
function from the tensorflow.contrib.layers
module (as in the book), we now use the dense()
function from the tf.layers
module, which did not exist when this chapter was written. This is preferable because anything in contrib may change or be deleted without notice, while tf.layers
is part of the official API. As you will see, the code is mostly the same.
The main differences relevant to this chapter are:
- the
suffix was removed in all the parameters that had it (for example theactivation_fn
parameter was renamed toactivation
). - the
parameter was renamed tokernel
, - the default activation is
instead oftf.nn.relu
import tensorflow as tf
# 1. Specify the network architecture
n_inputs = 4 # == env.observation_space.shape[0]
n_hidden = 4 # it's a simple task, we don't need more than this
n_outputs = 1 # only outputs the probability of accelerating left
initializer = tf.contrib.layers.variance_scaling_initializer()
# 2. Build the neural network
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = tf.layers.dense(X, n_hidden, activation=tf.nn.elu,
outputs = tf.layers.dense(hidden, n_outputs, activation=tf.nn.sigmoid,
# 3. Select a random action based on the estimated probabilities
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)
init = tf.global_variables_initializer()
In this particular environment, the past actions and observations can safely be ignored, since each observation contains the environment's full state. If there were some hidden state then you may need to consider past actions and observations in order to try to infer the hidden state of the environment. For example, if the environment only revealed the position of the cart but not its velocity, you would have to consider not only the current observation but also the previous observation in order to estimate the current velocity. Another example is if the observations are noisy: you may want to use the past few observations to estimate the most likely current state. Our problem is thus as simple as can be: the current observation is noise-free and contains the environment's full state.
You may wonder why we are picking a random action based on the probability given by the policy network, rather than just picking the action with the highest probability. This approach lets the agent find the right balance between exploring new actions and exploiting the actions that are known to work well. Here's an analogy: suppose you go to a restaurant for the first time, and all the dishes look equally appealing so you randomly pick one. If it turns out to be good, you can increase the probability to order it next time, but you shouldn't increase that probability to 100%, or else you will never try out the other dishes, some of which may be even better than the one you tried.
Let's randomly initialize this policy neural network and use it to play one game:
n_max_steps = 1000
frames = []
with tf.Session() as sess:
obs = env.reset()
for step in range(n_max_steps):
img = render_cart_pole(env, obs)
action_val = action.eval(feed_dict={X: obs.reshape(1, n_inputs)})
obs, reward, done, info = env.step(action_val[0][0])
if done:
Now let's look at how well this randomly initialized policy network performed:
video = plot_animation(frames)
<IPython.core.display.Javascript object>
Yeah... pretty bad. The neural network will have to learn to do
better. First let's see if it is capable of learning the basic policy
we used earlier: go left if the pole is tilting left, and go right if
it is tilting right. The following code defines the same neural
network but we add the target probabilities y
, and the training
operations (cross_entropy
, optimizer
and training_op
import tensorflow as tf
n_inputs = 4
n_hidden = 4
n_outputs = 1
learning_rate = 0.01
initializer = tf.contrib.layers.variance_scaling_initializer()
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
hidden = tf.layers.dense(X, n_hidden, activation=tf.nn.elu, kernel_initializer=initializer)
logits = tf.layers.dense(hidden, n_outputs)
outputs = tf.nn.sigmoid(logits) # probability of action 0 (left)
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(learning_rate)
training_op = optimizer.minimize(cross_entropy)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
We can make the same net play in 10 different environments in parallel, and train for 1000 iterations. We also reset environments when they are done.
n_environments = 10
n_iterations = 1000
envs = [gym.make("CartPole-v0") for _ in range(n_environments)]
observations = [env.reset() for env in envs]
with tf.Session() as sess:
for iteration in range(n_iterations):
target_probas = np.array([([1.] if obs[2] < 0 else [0.]) for obs in observations]) # if angle<0 we want proba(left)=1., or else proba(left)=0.
action_val, _ =[action, training_op], feed_dict={X: np.array(observations), y: target_probas})
for env_index, env in enumerate(envs):
obs, reward, done, info = env.step(action_val[env_index][0])
observations[env_index] = obs if not done else env.reset(), "./my_policy_net_basic.ckpt")
for env in envs:
def render_policy_net(model_path, action, X, n_max_steps = 1000):
frames = []
env = gym.make("CartPole-v0")
obs = env.reset()
with tf.Session() as sess:
saver.restore(sess, model_path)
for step in range(n_max_steps):
img = render_cart_pole(env, obs)
action_val = action.eval(feed_dict={X: obs.reshape(1, n_inputs)})
obs, reward, done, info = env.step(action_val[0][0])
if done:
return frames
frames = render_policy_net("./my_policy_net_basic.ckpt", action, X)
video = plot_animation(frames)
Looks like it learned the policy correctly. Now let's see if it can learn a better policy on its own.
To train this neural network we will need to define the target
probabilities y
. If an action is good we should increase its
probability, and conversely if it is bad we should reduce it. But how
do we know whether an action is good or bad? The problem is that most
actions have delayed effects, so when you win or lose points in a
game, it is not clear which actions contributed to this result: was it
just the last action? Or the last 10? Or just one action 50 steps
earlier? This is called the credit assignment problem.
The Policy Gradients algorithm tackles this problem by first playing multiple games, then making the actions in good games slightly more likely, while actions in bad games are made slightly less likely. First we play, then we go back and think about what we did.
import tensorflow as tf
n_inputs = 4
n_hidden = 4
n_outputs = 1
learning_rate = 0.01
initializer = tf.contrib.layers.variance_scaling_initializer()
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = tf.layers.dense(X, n_hidden, activation=tf.nn.elu, kernel_initializer=initializer)
logits = tf.layers.dense(hidden, n_outputs)
outputs = tf.nn.sigmoid(logits) # probability of action 0 (left)
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)
y = 1. - tf.to_float(action)
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(learning_rate)
grads_and_vars = optimizer.compute_gradients(cross_entropy)
gradients = [grad for grad, variable in grads_and_vars]
gradient_placeholders = []
grads_and_vars_feed = []
for grad, variable in grads_and_vars:
gradient_placeholder = tf.placeholder(tf.float32, shape=grad.get_shape())
grads_and_vars_feed.append((gradient_placeholder, variable))
training_op = optimizer.apply_gradients(grads_and_vars_feed)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
def discount_rewards(rewards, discount_rate):
discounted_rewards = np.zeros(len(rewards))
cumulative_rewards = 0
for step in reversed(range(len(rewards))):
cumulative_rewards = rewards[step] + cumulative_rewards * discount_rate
discounted_rewards[step] = cumulative_rewards
return discounted_rewards
def discount_and_normalize_rewards(all_rewards, discount_rate):
all_discounted_rewards = [discount_rewards(rewards, discount_rate) for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean)/reward_std for discounted_rewards in all_discounted_rewards]
discount_rewards([10, 0, -50], discount_rate=0.8)
array([-22., -40., -50.])
discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_rate=0.8)
[array([-0.28435071, -0.86597718, -1.18910299]),
array([ 1.26665318, 1.0727777 ])]
env = gym.make("CartPole-v0")
n_games_per_update = 10
n_max_steps = 1000
n_iterations = 250
save_iterations = 10
discount_rate = 0.95
with tf.Session() as sess:
for iteration in range(n_iterations):
print("\rIteration: {}".format(iteration), end="")
all_rewards = []
all_gradients = []
for game in range(n_games_per_update):
current_rewards = []
current_gradients = []
obs = env.reset()
for step in range(n_max_steps):
action_val, gradients_val =[action, gradients], feed_dict={X: obs.reshape(1, n_inputs)})
obs, reward, done, info = env.step(action_val[0][0])
if done:
all_rewards = discount_and_normalize_rewards(all_rewards, discount_rate=discount_rate)
feed_dict = {}
for var_index, gradient_placeholder in enumerate(gradient_placeholders):
mean_gradients = np.mean([reward * all_gradients[game_index][step][var_index]
for game_index, rewards in enumerate(all_rewards)
for step, reward in enumerate(rewards)], axis=0)
feed_dict[gradient_placeholder] = mean_gradients, feed_dict=feed_dict)
if iteration % save_iterations == 0:, "./my_policy_net_pg.ckpt")
Iteration: 249
frames = render_policy_net("./my_policy_net_pg.ckpt", action, X, n_max_steps=1000)
video = plot_animation(frames)
transition_probabilities = [
[0.7, 0.2, 0.0, 0.1], # from s0 to s0, s1, s2, s3
[0.0, 0.0, 0.9, 0.1], # from s1 to ...
[0.0, 1.0, 0.0, 0.0], # from s2 to ...
[0.0, 0.0, 0.0, 1.0], # from s3 to ...
n_max_steps = 50
def print_sequence(start_state=0):
current_state = start_state
print("States:", end=" ")
for step in range(n_max_steps):
print(current_state, end=" ")
if current_state == 3:
current_state = rnd.choice(range(4), p=transition_probabilities[current_state])
print("...", end="")
for _ in range(10):
States: 0 0 3
States: 0 1 2 1 2 1 2 1 2 1 3
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3
States: 0 3
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3
States: 0 1 3
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 ...
States: 0 0 3
States: 0 0 0 1 2 1 2 1 3
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3
transition_probabilities = [
[[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]], # in s0, if action a0 then proba 0.7 to state s0 and 0.3 to state s1, etc.
[[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
[None, [0.8, 0.1, 0.1], None],
rewards = [
[[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
[[0, 0, 0], [0, 0, 0], [0, 0, -50]],
[[0, 0, 0], [+40, 0, 0], [0, 0, 0]],
possible_actions = [[0, 1, 2], [0, 2], [1]]
def policy_fire(state):
return [0, 2, 1][state]
def policy_random(state):
return rnd.choice(possible_actions[state])
def policy_safe(state):
return [0, 0, 1][state]
class MDPEnvironment(object):
def __init__(self, start_state=0):
def reset(self):
self.total_rewards = 0
self.state = self.start_state
def step(self, action):
next_state = rnd.choice(range(3), p=transition_probabilities[self.state][action])
reward = rewards[self.state][action][next_state]
self.state = next_state
self.total_rewards += reward
return self.state, reward
def run_episode(policy, n_steps, start_state=0, display=True):
env = MDPEnvironment()
if display:
print("States (+rewards):", end=" ")
for step in range(n_steps):
if display:
if step == 10:
print("...", end=" ")
elif step < 10:
print(env.state, end=" ")
action = policy(env.state)
state, reward = env.step(action)
if display and step < 10:
if reward:
print("({})".format(reward), end=" ")
if display:
print("Total rewards =", env.total_rewards)
return env.total_rewards
for policy in (policy_fire, policy_random, policy_safe):
all_totals = []
for episode in range(1000):
all_totals.append(run_episode(policy, n_steps=100, display=(episode<5)))
print("Summary: mean={:.1f}, std={:1f}, min={}, max={}".format(np.mean(all_totals), np.std(all_totals), np.min(all_totals), np.max(all_totals)))
States (+rewards): 0 (10) 0 (10) 0 1 (-50) 2 2 2 (40) 0 (10) 0 (10) 0 (10) ... Total rewards = 210
States (+rewards): 0 1 (-50) 2 (40) 0 (10) 0 (10) 0 1 (-50) 2 2 (40) 0 (10) ... Total rewards = 70
States (+rewards): 0 (10) 0 1 (-50) 2 (40) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) ... Total rewards = 70
States (+rewards): 0 1 (-50) 2 1 (-50) 2 (40) 0 (10) 0 1 (-50) 2 (40) 0 ... Total rewards = -10
States (+rewards): 0 1 (-50) 2 (40) 0 (10) 0 (10) 0 1 (-50) 2 (40) 0 (10) 0 (10) ... Total rewards = 290
Summary: mean=121.1, std=129.333766, min=-330, max=470
States (+rewards): 0 1 (-50) 2 1 (-50) 2 (40) 0 1 (-50) 2 2 (40) 0 ... Total rewards = -60
States (+rewards): 0 (10) 0 0 0 0 0 (10) 0 0 0 (10) 0 ... Total rewards = -30
States (+rewards): 0 1 1 (-50) 2 (40) 0 0 1 1 1 1 ... Total rewards = 10
States (+rewards): 0 (10) 0 (10) 0 0 0 0 1 (-50) 2 (40) 0 0 ... Total rewards = 0
States (+rewards): 0 0 (10) 0 1 (-50) 2 (40) 0 0 0 0 (10) 0 (10) ... Total rewards = 40
Summary: mean=-22.1, std=88.152740, min=-380, max=200
States (+rewards): 0 1 1 1 1 1 1 1 1 1 ... Total rewards = 0
States (+rewards): 0 1 1 1 1 1 1 1 1 1 ... Total rewards = 0
States (+rewards): 0 (10) 0 (10) 0 (10) 0 1 1 1 1 1 1 ... Total rewards = 30
States (+rewards): 0 (10) 0 1 1 1 1 1 1 1 1 ... Total rewards = 10
States (+rewards): 0 1 1 1 1 1 1 1 1 1 ... Total rewards = 0
Summary: mean=22.3, std=26.244312, min=0, max=170
Q-Learning will learn the optimal policy by watching the random policy play.
n_states = 3
n_actions = 3
n_steps = 20000
alpha = 0.01
gamma = 0.99
exploration_policy = policy_random
q_values = np.full((n_states, n_actions), -np.inf)
for state, actions in enumerate(possible_actions):
env = MDPEnvironment()
for step in range(n_steps):
action = exploration_policy(env.state)
state = env.state
next_state, reward = env.step(action)
next_value = np.max(q_values[next_state]) # greedy policy
q_values[state, action] = (1-alpha)*q_values[state, action] + alpha*(reward + gamma * next_value)
def optimal_policy(state):
return np.argmax(q_values[state])
array([[ 39.13508139, 38.88079412, 35.23025716],
[ 18.9117071 , -inf, 20.54567816],
[ -inf, 72.53192111, -inf]])
all_totals = []
for episode in range(1000):
all_totals.append(run_episode(optimal_policy, n_steps=100, display=(episode<5)))
print("Summary: mean={:.1f}, std={:1f}, min={}, max={}".format(np.mean(all_totals), np.std(all_totals), np.min(all_totals), np.max(all_totals)))
States (+rewards): 0 (10) 0 (10) 0 1 (-50) 2 (40) 0 (10) 0 1 (-50) 2 (40) 0 (10) ... Total rewards = 230
States (+rewards): 0 (10) 0 (10) 0 (10) 0 1 (-50) 2 2 1 (-50) 2 (40) 0 (10) ... Total rewards = 90
States (+rewards): 0 1 (-50) 2 (40) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) ... Total rewards = 170
States (+rewards): 0 1 (-50) 2 (40) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) 0 (10) ... Total rewards = 220
States (+rewards): 0 1 (-50) 2 (40) 0 (10) 0 1 (-50) 2 (40) 0 (10) 0 (10) 0 (10) ... Total rewards = -50
Summary: mean=125.6, std=127.363464, min=-290, max=500
env = gym.make("MsPacman-v0")
obs = env.reset()
(210, 160, 3)
Preprocessing the images is optional but greatly speeds up training.
mspacman_color = np.array([210, 164, 74]).mean()
def preprocess_observation(obs):
img = obs[1:176:2, ::2] # crop and downsize
img = img.mean(axis=2) # to greyscale
img[img==mspacman_color] = 0 # Improve contrast
img = (img - 128) / 128 - 1 # normalize from -1. to 1.
return img.reshape(88, 80, 1)
img = preprocess_observation(obs)
plt.figure(figsize=(11, 7))
plt.title("Original observation (160×210 RGB)")
plt.title("Preprocessed observation (88×80 greyscale)")
plt.imshow(img.reshape(88, 80), interpolation="nearest", cmap="gray")
Note: instead of using tf.contrib.layers.convolution2d()
or tf.contrib.layers.conv2d()
(as in the book), we now use the tf.layers.conv2d()
, which did not exist when this chapter was written. This is preferable because anything in contrib may change or be deleted without notice, while tf.layers
is part of the official API. As you will see, the code is mostly the same, except that the parameter names have changed slightly:
- the
parameter was renamed tofilters
, - the
parameter was renamed tostrides
, - the
suffix was removed from parameter names that had it (e.g.,activation_fn
was renamed toactivation
), - the
parameter was renamed tokernel_initializer
, - the weights variable was renamed to
(instead of"weights"
), and the biases variable was renamed from"biases"
, - and the default
is nowNone
instead oftf.nn.relu
input_height = 88
input_width = 80
input_channels = 1
conv_n_maps = [32, 64, 64]
conv_kernel_sizes = [(8,8), (4,4), (3,3)]
conv_strides = [4, 2, 1]
conv_paddings = ["SAME"]*3
conv_activation = [tf.nn.relu]*3
n_hidden_inputs = 64 * 11 * 10 # conv3 has 64 maps of 11x10 each
n_hidden = 512
hidden_activation = tf.nn.relu
n_outputs = env.action_space.n
initializer = tf.contrib.layers.variance_scaling_initializer()
learning_rate = 0.01
def q_network(X_state, scope):
prev_layer = X_state
conv_layers = []
with tf.variable_scope(scope) as scope:
for n_maps, kernel_size, strides, padding, activation in zip(conv_n_maps, conv_kernel_sizes, conv_strides, conv_paddings, conv_activation):
prev_layer = tf.layers.conv2d(prev_layer, filters=n_maps, kernel_size=kernel_size, strides=strides, padding=padding, activation=activation, kernel_initializer=initializer)
last_conv_layer_flat = tf.reshape(prev_layer, shape=[-1, n_hidden_inputs])
hidden = tf.layers.dense(last_conv_layer_flat, n_hidden, activation=hidden_activation, kernel_initializer=initializer)
outputs = tf.layers.dense(hidden, n_outputs)
trainable_vars = {[len(]: var for var in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,}
return outputs, trainable_vars
X_state = tf.placeholder(tf.float32, shape=[None, input_height, input_width, input_channels])
actor_q_values, actor_vars = q_network(X_state, scope="q_networks/actor") # acts
critic_q_values, critic_vars = q_network(X_state, scope="q_networks/critic") # learns
copy_ops = [actor_var.assign(critic_vars[var_name])
for var_name, actor_var in actor_vars.items()]
copy_critic_to_actor =*copy_ops)
with tf.variable_scope("train"):
X_action = tf.placeholder(tf.int32, shape=[None])
y = tf.placeholder(tf.float32, shape=[None, 1])
q_value = tf.reduce_sum(critic_q_values * tf.one_hot(X_action, n_outputs),
axis=1, keep_dims=True)
cost = tf.reduce_mean(tf.square(y - q_value))
global_step = tf.Variable(0, trainable=False, name='global_step')
optimizer = tf.train.AdamOptimizer(learning_rate)
training_op = optimizer.minimize(cost, global_step=global_step)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
{'/conv2d/bias:0': <tf.Variable 'q_networks/actor/conv2d/bias:0' shape=(32,) dtype=float32_ref>,
'/conv2d/kernel:0': <tf.Variable 'q_networks/actor/conv2d/kernel:0' shape=(8, 8, 1, 32) dtype=float32_ref>,
'/conv2d_1/bias:0': <tf.Variable 'q_networks/actor/conv2d_1/bias:0' shape=(64,) dtype=float32_ref>,
'/conv2d_1/kernel:0': <tf.Variable 'q_networks/actor/conv2d_1/kernel:0' shape=(4, 4, 32, 64) dtype=float32_ref>,
'/conv2d_2/bias:0': <tf.Variable 'q_networks/actor/conv2d_2/bias:0' shape=(64,) dtype=float32_ref>,
'/conv2d_2/kernel:0': <tf.Variable 'q_networks/actor/conv2d_2/kernel:0' shape=(3, 3, 64, 64) dtype=float32_ref>,
'/dense/bias:0': <tf.Variable 'q_networks/actor/dense/bias:0' shape=(512,) dtype=float32_ref>,
'/dense/kernel:0': <tf.Variable 'q_networks/actor/dense/kernel:0' shape=(7040, 512) dtype=float32_ref>,
'/dense_1/bias:0': <tf.Variable 'q_networks/actor/dense_1/bias:0' shape=(9,) dtype=float32_ref>,
'/dense_1/kernel:0': <tf.Variable 'q_networks/actor/dense_1/kernel:0' shape=(512, 9) dtype=float32_ref>}
from collections import deque
replay_memory_size = 10000
replay_memory = deque([], maxlen=replay_memory_size)
def sample_memories(batch_size):
indices = rnd.permutation(len(replay_memory))[:batch_size]
cols = [[], [], [], [], []] # state, action, reward, next_state, continue
for idx in indices:
memory = replay_memory[idx]
for col, value in zip(cols, memory):
cols = [np.array(col) for col in cols]
return cols[0], cols[1], cols[2].reshape(-1, 1), cols[3], cols[4].reshape(-1, 1)
eps_min = 0.05
eps_max = 1.0
eps_decay_steps = 50000
import sys
def epsilon_greedy(q_values, step):
epsilon = max(eps_min, eps_max - (eps_max-eps_min) * step/eps_decay_steps)
if rnd.rand() < epsilon:
return rnd.randint(n_outputs) # random action
return np.argmax(q_values) # optimal action
n_steps = 100000 # total number of training steps
training_start = 1000 # start training after 1,000 game iterations
training_interval = 3 # run a training step every 3 game iterations
save_steps = 50 # save the model every 50 training steps
copy_steps = 25 # copy the critic to the actor every 25 training steps
discount_rate = 0.95
skip_start = 90 # Skip the start of every game (it's just waiting time).
batch_size = 50
iteration = 0 # game iterations
checkpoint_path = "./my_dqn.ckpt"
done = True # env needs to be reset
with tf.Session() as sess:
if os.path.isfile(checkpoint_path):
saver.restore(sess, checkpoint_path)
while True:
step = global_step.eval()
if step >= n_steps:
iteration += 1
print("\rIteration {}\tTraining step {}/{} ({:.1f}%)".format(iteration, step, n_steps, step * 100 / n_steps), end="")
if done: # game over, start again
obs = env.reset()
for skip in range(skip_start): # skip boring game iterations at the start of each game
obs, reward, done, info = env.step(0)
state = preprocess_observation(obs)
# Actor evaluates what to do
q_values = actor_q_values.eval(feed_dict={X_state: [state]})
action = epsilon_greedy(q_values, step)
# Actor plays
obs, reward, done, info = env.step(action)
next_state = preprocess_observation(obs)
# Let's memorize what happened
replay_memory.append((state, action, reward, next_state, 1.0 - done))
state = next_state
if iteration < training_start or iteration % training_interval != 0:
# Critic learns
X_state_val, X_action_val, rewards, X_next_state_val, continues = sample_memories(batch_size)
next_q_values = actor_q_values.eval(feed_dict={X_state: X_next_state_val})
y_val = rewards + continues * discount_rate * np.max(next_q_values, axis=1, keepdims=True){X_state: X_state_val, X_action: X_action_val, y: y_val})
# Regularly copy critic to actor
if step % copy_steps == 0:
# And save regularly
if step % save_steps == 0:, checkpoint_path)
