-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathdyna_agents.py
154 lines (133 loc) · 4.36 KB
/
dyna_agents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import numpy as np
import numpy.random as npr
from neuronav.agents.td_agents import TDAC, TDQ, TDSR
class DynaModule:
"""
Class which contains logic to enable Dyna algorithms.
"""
def __init__(self, state_size, num_recall=3, recency="exponential", **kwargs):
self.num_recall = num_recall
self.recency = recency
self.model = {}
self.prioritized_states = np.zeros(state_size, dtype=int)
def _sample_model(self):
# sample state
past_states = [k[0] for k in self.model.keys()]
sampled_state = past_states[npr.choice(len(past_states))]
# sample action previously taken from sampled state
past_actions = [k[1] for k in self.model.keys() if k[0] == sampled_state]
sampled_action = past_actions[npr.choice(len(past_actions))]
key = (sampled_state, sampled_action)
# get reward, state_next, done, and make exp
if self.recency == "exponential":
successors = self.model[key][1]
idx = np.minimum(len(successors) - 1, int(npr.exponential(scale=5)))
successor = successors[::-1][idx]
else:
successor = self.model[key][1]
exp = key + successor
return exp
def update(self, base_agent, current_exp, **kwargs):
state, action, next_state, reward, done = current_exp
# update (deterministic) model
key = (state, action)
value = (next_state, reward, done)
if self.recency == "exponential":
if key in self.model:
successors = self.model[key][1]
successors.append(value)
# shorten successors to capture >99% of probability mass
successors = successors[-25:]
self.model[key] = base_agent.num_updates, successors
else:
self.model[key] = base_agent.num_updates, [value]
else:
self.model[key] = base_agent.num_updates, value
for i in range(self.num_recall):
exp = self._sample_model()
self.prioritized_states[exp[0]] += 1
base_agent._update(exp)
return base_agent
class DynaQ(TDQ):
"""
Dyna-enabled version of Temporal Difference Q-learning algorithm.
"""
def __init__(
self,
state_size: int,
action_size: int,
lr: float = 1e-1,
gamma: float = 0.99,
poltype: str = "softmax",
beta: float = 1e4,
epsilon: float = 1e-1,
w_value: float = 1.0,
):
super(DynaQ, self).__init__(
state_size,
action_size,
lr=lr,
gamma=gamma,
poltype=poltype,
beta=beta,
epsilon=epsilon,
w_value=w_value,
)
self.dyna = DynaModule(state_size)
def update(self, current_exp):
_ = super().update(current_exp)
self = self.dyna.update(self, current_exp)
class DynaAC(TDAC):
"""
Dyna-enabled version of Temporal Difference Actor Critic algorithm.
"""
def __init__(
self,
state_size: int,
action_size: int,
lr: float = 1e-1,
gamma: float = 0.99,
poltype: str = "softmax",
beta: float = 1e4,
epsilon: float = 1e-1,
):
super(DynaAC, self).__init__(
state_size,
action_size,
lr=lr,
gamma=gamma,
poltype=poltype,
beta=beta,
epsilon=epsilon,
)
self.dyna = DynaModule(state_size)
def update(self, current_exp):
_ = super().update(current_exp)
self = self.dyna.update(self, current_exp)
class DynaSR(TDSR):
"""
Dyna-enabled version of Temporal Difference Successor Representation algorithm.
"""
def __init__(
self,
state_size: int,
action_size: int,
lr: float = 1e-1,
gamma: float = 0.99,
poltype: str = "softmax",
beta: float = 1e4,
epsilon: float = 1e-1,
):
super(DynaSR, self).__init__(
state_size,
action_size,
lr=lr,
gamma=gamma,
poltype=poltype,
beta=beta,
epsilon=epsilon,
)
self.dyna = DynaModule(state_size)
def update(self, current_exp):
_ = super().update(current_exp)
self = self.dyna.update(self, current_exp)