In this tutorial we are going to beat the gym game Taxi-v3 using Keras and Q-Learning. The model that we will implement is partly taken from Anirban Sarkar
, blog ''Reinforment Learning for Taxi-v2''. Also there is no significant difference between Taxi-v2 and Taxi-v3, therefore it is excellent reading material that will explain what we do (ignore the SARSA part).
For this tutorial we will provide you with a basic agent and a memory structure (we will get to that part). The things that have to be implemented are:
Rank | Name | 100 games | 250 games | 500 games | Git Link |
---|
import gym
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Embedding, Reshape
from tensorflow.keras.optimizers import Adam
class BaseAgent:
def __init__(self, game_name):
self.game_name = game_name
self.model = None
self.memory = None
def random_average(self, nr_games=500):
""" Performs a number of random games and returns the average score. """
env = gym.make(self.game_name)
collected_scores = []
for _ in range(nr_games):
env.reset()
done = False
score = 0
while not done:
action = env.action_space.sample()
obs, reward, done, info = env.step(action)
score += reward
collected_scores.append(score)
env.close()
print(f"\nA random model played: {nr_games} games, with an average score of:"
f" {sum(collected_scores) / nr_games:5.2f}")
return sum(collected_scores) / nr_games
def plot_history(self, history):
""" Shows the loss (Mean squared error) and accuracy over time. """
plt.subplots(1, 2, figsize=(30, 10))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'])
plt.title("Loss")
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'])
plt.title("Accuracy")
plt.show()
def evaluate_model(self, model, nr_games=100):
""" Evaluate the model results. """
env = gym.make(self.game_name)
collected_scores = []
for episode in range(1, nr_games + 1):
obs = env.reset()
done = False
score = 0
while not done:
# Get action from model
model_x = np.array([obs])
action = np.argmax(model.predict(model_x)[0])
# update everything
obs, reward, done, info = env.step(action)
score += reward
collected_scores.append(score)
print(f"\r\tGame {episode:3d}/{nr_games:3d} score: {score}", end='')
env.close()
print(
f"\n\nThe model played: {nr_games} games, with an average score of: {sum(collected_scores) / nr_games:5.2f}")
@staticmethod
def create_model(input_shape, output_shape):
""" Create the model. """
raise NotImplemented
class MemoryDeque:
"""
A Base implementation of a deque memory.
:param size: int
The maximum size of the memory
:param to_numpy: bool
Flag variable to indicate a conversion to numpy is preferred whenever returning a batch.
This is required whenever the input data is a vector. (e.g. reward = [1])
"""
_transition = namedtuple('transition', ('state', 'action', 'reward', 'done', 'next_state'))
def __init__(self, size, to_numpy=True):
self.size = size
self.to_numpy = to_numpy
self.pointer = 0
self.memory = deque(maxlen=size)
def __getitem__(self, item):
return self.memory[item]
def __len__(self):
return len(self.memory)
def is_full(self):
return len(self.memory) == self.size
def reset(self):
self.pointer = 0
self.filled = False
def get_batch(self, batch_size=32):
states, actions, rewards, done, states_next = zip(*random.sample(self.memory, batch_size))
if self.to_numpy:
states = np.vstack(states)
actions = np.hstack(actions)
rewards = np.hstack(rewards)
done = np.hstack(done)
states_next = np.vstack(states_next)
return self._transition(states, actions, rewards, done, states_next)
def add(self, state, action, reward, done, next_state):
self.memory.append(self._transition(state, action, reward, done, next_state))
agent = BaseAgent(game_name='Taxi-v3')
agent.random_average()
Expected output should be similar to:
A random model played: 500 games, with an average score of: -773.94
In Taxi imitation learning you have already seen that with a standard neural network Taxi is not solvable. There were two main reasons for this:
It takes a long time to get random good games in Taxi.
Taxi gives the states as a single integer value, e.g. 50 or 51, where they can both have completely different optimal actions.
In the previous lesson you have seen this function:
Which requires both a state and an action. Now often this state and action space can become huge quite quickly. For a deck of 52 cards there are 8x1067 different ways to order them, which are more orderings than atoms in the universe[1]. Now by using a neural network we can approximate the q-values for all these states. This means that the model can predict the -value for all actions for a certain .
Since we are updating the , by using the maximum expected reward value, we will eventually find out all the actions that lead to a high reward. This means that in the beginning we will perform 'random' actions that our model thinks is the best at that time. But overtime it will become closer and closer to optimal.
The training loop will look very similar to the Q-table update, but instead of taking the values from a Q-table you will now be taking them from a Keras Model. And instead of updating a value to a new value you will be using the fit function. Since our q-values are not yet correct at the start, we have to take a lot of samples from the environment and learn the correct values on the fly. So we simply take the best value at that time, and correct them afterwards.
To start of you should inherit from the BaseAgent. After which you can create your own model. The embedding layer may be a bit confusing, even after the Keras documentation and explanation above. Therefore here is one more attempt from Jason Brownlee
, explaining embedding using examples from the NLP in his blog ''How to Use Word Embedding Layers for Deep Learning with Keras''.
Using the provided code and the knowledge from this and previous lessons you should be able to implement an algorithm that is able to reach a positive score.
It is better to try some things yourself before you look at the answers below! If you are stuck or need help please ask a question during the Team Evening or in the slack in the #ec-helpme channel.
Create a standard gym loop.
Update the loop with a model that will be trained, and let the model predict the action.
For an example of how to create a model that uses an embedding layer see:
We are using a smaller model, but there are many good options. One of the more important things to notice is that we are using a learning rate of 1e-2. Smaller learning rates, such as 1e-3, will take a lot longer to train.
class TaxiAgent(BaseAgent):
@staticmethod
def create_model(input_shape, output_shape, *args, **kwargs):
model = Sequential()
model.add(Embedding(500, 6, input_length=input_shape))
model.add(Reshape((6 * input_shape,)))
model.compile(Adam(lr=1e-2), loss='mae')
return model
agent.memory.get_batch(batch_size=32)
.)def train_model(my_agent, gamma=0.99, nr_games=1_000):
env = gym.make(my_agent.game_name)
batch_size = 64
collected_scores = []
running_mean = my_agent.random_average(500)
for episode in range(nr_games + 1):
obs = env.reset()
done = False
score = 0
while not done:
model_x = np.array([obs])
action = np.argmax(my_agent.model.predict(model_x)[0])
next_obs, reward, done, info = env.step(action)
my_agent.memory.add(obs, action, reward, done, next_obs)
score += reward
if len(my_agent.memory) > batch_size:
transitions = my_agent.memory.get_batch(batch_size=batch_size)
q_values = my_agent.model.predict(np.array(transitions.state))
q_values_next = my_agent.model.predict(np.array(transitions.next_state))
targets = q_values.copy()
for k in range(batch_size):
q_update = transitions.reward[k]
if not transitions.done[k]:
q_update = transitions.reward[k] + gamma * np.amax(q_values_next[k])
targets[k][transitions.action[k]] = q_update
my_agent.model.fit(transitions.state, targets, verbose=0)
# Update the observation
obs = next_obs
collected_scores.append(score)
# Update information (for the user)
running_mean = running_mean * 0.99 + 0.01 * score
msg_episode = f"\rThe model played episode {episode:5d}, with a score of: {score: 5d},"
print(msg_episode, end='')
if episode % 100 == 0:
msg_stats = f"running mean: {running_mean: 5.0f}"
print(msg_episode, msg_stats)
return collected_scores
One of the improvements proposed in the previous lessons was using an . Now to understand why this epsilon is improving your training we are going to talk about exploration versus exploitation.
For the agent to perform at its best it needs to see as much states as possible. Using only random actions (exploring) to reach new states is inefficient, as to get into deeper states you need a lot of luck to do the right random actions for more and more times. Using only the action that the Agent thinks is best (exploiting) will also get you locked into a single branch of the game, giving you a solution but not necessarily the best solution. This also means it is not able to learn that some actions seem bad in the short term, but are actually good in the long run.
Therefore a mix between exploration and exploitation is good. For this the greek letter (epsilon) is often used, to give the chance of taking a random action at each step in the game. So given an of 1, all actions taken would be random, and given 0, all actions would be decided by the Agent (or model). Try to implement this epsilon in your code, and see what the result is. And what the effect is of a higher and lower epsilon on the training speed.
It also is possible to change the epsilon over time so you can take into account that a higher epsilon (more random moves) is better in the start and worse in the end.
def train_model_epsilon(my_agent, gamma=0.99, nr_games=1_000):
epsilon = 1
epsilon_min = 0.05
batch_size = 64
collected_scores = []
running_mean = my_agent.random_average(500)
env = gym.make(my_agent.game_name)
for episode in range(nr_games + 1):
obs = env.reset()
done = False
score = 0
epsilon = max(epsilon * 0.99, epsilon_min)
while not done:
if random.random() < epsilon:
action = env.action_space.sample()
else:
model_x = np.array([obs])
action = np.argmax(my_agent.model.predict(model_x)[0])
next_obs, reward, done, info = env.step(action)
my_agent.memory.add(obs, action, reward, done, next_obs)
score += reward
if len(my_agent.memory) > batch_size:
transitions = my_agent.memory.get_batch(batch_size=batch_size)
q_values = my_agent.model.predict(np.array(transitions.state))
q_values_next = my_agent.model.predict(np.array(transitions.next_state))
targets = q_values.copy()
for k in range(len(q_values)):
q_update = transitions.reward[k]
if not transitions.done[k]:
q_update = transitions.reward[k] + gamma * np.amax(q_values_next[k])
targets[k][transitions.action[k]] = q_update
my_agent.model.fit(transitions.state, targets, verbose=0)
# Update the observation
obs = next_obs
collected_scores.append(score)
# Update information (for the user)
running_mean = running_mean * 0.99 + 0.01 * score
msg_episode = f"\rThe model played episode {episode:5d}, with a score of: {score: 5d},"
print(msg_episode, end='')
if episode % 100 == 0:
msg_stats = f"running mean: {running_mean: 5.0f}, epsilon: {epsilon: 0.5f}"
print(msg_episode, msg_stats)
return collected_scores
The running time of this model may vary due to stochastic (random) variables. For example when the model is initialized with good weights, this will ofcourse speed up the training a lot. Also using a GPU versus a CPU can have great influence.
Overall the number of training episodes should be at around 500 for a correct implementation and a good random seed. When more than 200 episodes, or ~20 minutes of training, is required before the model is showing any signs of improvements. There is probably an implementation error somewhere and you should verify your code.
if __name__ == '__main__':
# initialize the base agent, and show the random score.
agent = BaseAgent(game_name='Taxi-v3')
agent.random_average()
# initialize your agent
my_agent = TaxiAgent(game_name='Taxi-v3')
my_agent.model = TaxiAgent.create_model(input_shape=1, output_shape=6)
my_agent.memory = MemoryDeque(size=500_000)
# Simple training loop
scores = train_model(my_agent)
# Reset your agent
my_agent.model = TaxiAgent.create_model(input_shape=1, output_shape=6)
my_agent.memory = MemoryDeque(size=500_000)
# Epsilon training loop
train_model_epsilon(my_agent, nr_games=1_000)
A random model played: 500 games, with an average score of: -773.94
The model played episode 0, with a score of: -1127, running mean: -777
The model played episode 100, with a score of: -1199, running mean: -856
The model played episode 200, with a score of: -200, running mean: -737
The model played episode 300, with a score of: -200, running mean: -654
The model played episode 400, with a score of: -326, running mean: -571
The model played episode 500, with a score of: 14, running mean: -345
The model played episode 600, with a score of: 11, running mean: -158
The model played episode 700, with a score of: 6, running mean: -80
The model played episode 800, with a score of: 7, running mean: -38
The model played episode 900, with a score of: 9, running mean: -12
The model played episode 1000, with a score of: 10, running mean: -4
A random model played: 500 games, with an average score of: -763.64
The model played episode 0, with a score of: -776, running mean: -764, epsilon: 0.99000
The model played episode 100, with a score of: -34, running mean: -366, epsilon: 0.36237
The model played episode 200, with a score of: 5, running mean: -138, epsilon: 0.13264
The model played episode 300, with a score of: 12, running mean: -48, epsilon: 0.05000
The model played episode 400, with a score of: 10, running mean: -15, epsilon: 0.05000
The model played episode 500, with a score of: 8, running mean: -3, epsilon: 0.05000
The model played episode 600, with a score of: 4, running mean: 2, epsilon: 0.05000
The model played episode 700, with a score of: 6, running mean: 3, epsilon: 0.05000
The model played episode 800, with a score of: -7, running mean: 4, epsilon: 0.05000
The model played episode 900, with a score of: -7, running mean: 4, epsilon: 0.05000
The model played episode 1000, with a score of: 3, running mean: 5, epsilon: 0.05000
Q-learning using a Neural Network is an approximation of a Q-table, and is used when a Q-table cannot be used due to (often) memory limitations. Therefore it usually takes a bit longer to converge then a Q-table and will not always pick the best actions. In games like CartPole or Pong where there are more states than can possibly fit on memory, a numpy implementation of a Q-table is not possible, but an approximation is. To test your skills you can try and implement this algorithm on CartPole. Think well about what you need to change about your current implementation to make it work with CartPole. If you are having difficulties with it do not hestitate to ask the education committee.