-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcartpole_approx.py
129 lines (103 loc) · 3.59 KB
/
cartpole_approx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import gym
from keras.layers import Dense
from keras.models import Sequential
import numpy as np
import random
np.random.seed(0)
# Exploration value
epsilon = 1.0
exploration_decay = 0.99
min_epsilon = 0.1
# Discount Factor
gamma = 1.0
# find out when success
ave_steps_per_episode = np.zeros((100, 1))
idx_steps = -1
env = gym.make('CartPole-v0')
env.seed(0)
def q_model(observations, actions):
model = Sequential()
model.add(Dense(8, batch_input_shape=(None, observations), init='lecun_uniform', activation='relu'))
model.add(Dense(8, init='lecun_uniform', activation='relu'))
model.add(Dense(actions, activation='linear'))
model.compile(loss='mse', optimizer='adam')
return model
def select_action(qs):
# Epsilon-greedy action selection
allow_exploration = np.random.rand()
if allow_exploration > epsilon:
return np.argmax(qs)
else:
return env.action_space.sample()
def final_agent_test():
env = gym.make("CartPole-v0")
for i_episode in range(10):
state = env.reset()
for t in range(500):
env.render()
action = np.argmax(q_vals.predict(np.reshape(state, (1, 4))))
next_state, reward, done, info = env.step(action)
state = next_state
if reward == 0.0:
break
print("Episode {} finished after {} timesteps with final Reward = {}".format(i_episode + 1, t + 1, reward))
# Model Definition
q_vals = q_model(observations=4, actions=2)
# Experience arrays
experience_states = []
experience_values = []
experience_idx = -1
full_arrays = 0
max_size = 100
batch_size = 20
for i_episode in xrange(5000):
state = env.reset()
for t in xrange(199):
# Value of current state's actions
Q_state = q_vals.predict(np.reshape(state, (1, 4)))
action = select_action(Q_state)
# System response to my action
next_state, reward, done, info = env.step(action)
# value of taking the greedy action in the next state
Q_next_state = q_vals.predict(np.reshape(next_state, (1, 4)))
if done:
reward = -100
# Calculate TD Error
if done:
Q_state[0][action] = reward
else:
Q_state[0][action] = reward + gamma * np.max(Q_next_state[:])
# Increment experience index
experience_idx += 1
# Put values in experience arrays
if experience_idx < max_size and not full_arrays:
experience_states.append(state)
experience_values.append(Q_state[0])
else:
full_arrays = 1
if experience_idx == max_size:
experience_idx = 0
experience_states[experience_idx] = state
experience_values[experience_idx] = Q_state[0]
batch = random.sample(range(max_size), batch_size)
if experience_idx % 2 == 0:
q_vals.train_on_batch(np.array(experience_states)[batch], np.array(experience_values)[batch])
# Set new state as current state for next iter
state = next_state
if done or t == 198:
if (i_episode + 1) % 10 == 0:
print("Episode {} finished after {} timesteps | Epsilon = {}".format(i_episode+1, t+1, epsilon))
break
# Update exploration variable
if epsilon > min_epsilon:
epsilon *= exploration_decay
else:
epsilon = min_epsilon
# Keeping the running average
idx_steps += 1
if idx_steps > 99:
idx_steps = 0
ave_steps_per_episode[idx_steps] = t+1
if np.mean(ave_steps_per_episode) >= 195:
final_agent_test()
break