-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathmc_agents.py
115 lines (97 loc) · 3.09 KB
/
mc_agents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import numpy as np
import numpy.random as npr
import neuronav.utils as utils
from neuronav.agents.base_agent import BaseAgent
class QEC(BaseAgent):
"""
Implementation of episodic control Q-learning algorithm.
"""
def __init__(
self,
state_size: int,
action_size: int,
lr: float = 1e-1,
gamma: float = 0.99,
poltype: str = "softmax",
beta: float = 1e4,
epsilon: float = 1e-1,
Q_init=None,
**kwargs
):
super().__init__(state_size, action_size, lr, gamma, poltype, beta, epsilon)
if Q_init is None:
self.Q = np.zeros((action_size, state_size))
elif np.isscalar(Q_init):
self.Q = Q_init * npr.randn(action_size, state_size)
else:
self.Q = Q_init
def sample_action(self, state):
Qs = self.Q[:, state]
return self.base_sample_action(Qs)
def _update(self, current_exp, **kwargs):
s, sa, s_1, r, d = current_exp
self.exp_list.append(current_exp)
if d:
self.backup(self.exp_list)
return None
def q_estimate(self, state):
return self.Q[:, state]
def backup(self, exp_list):
# update Q values
rewards = [exp[3] for exp in exp_list]
returns = self.discount(rewards, self.gamma)
for i, exp in enumerate(exp_list):
s, sa, s_1, r, d = exp
self.Q[sa, s] = np.max([self.Q[sa, s], returns[i]])
self.exp_list = []
def get_policy(self):
return self.base_get_policy(self.Q)
def reset(self):
self.exp_list = []
class QMC(BaseAgent):
"""
Implementation of Monte Carlo Q-learning
"""
def __init__(
self,
state_size: int,
action_size: int,
lr: float = 1e-1,
gamma: float = 0.99,
poltype: str = "softmax",
beta: float = 1e4,
epsilon: float = 1e-1,
Q_init=None,
**kwargs
):
super().__init__(state_size, action_size, lr, gamma, poltype, beta, epsilon)
if Q_init is None:
self.Q = np.zeros((action_size, state_size))
elif np.isscalar(Q_init):
self.Q = Q_init * npr.randn(action_size, state_size)
else:
self.Q = Q_init
def sample_action(self, state):
Qs = self.Q[:, state]
return self.base_sample_action(Qs)
def q_estimate(self, state):
return self.Q[:, state]
def _update(self, current_exp, **kwargs):
s, sa, s_1, r, d = current_exp
self.exp_list.append(current_exp)
if d:
self.backup(self.exp_list)
return None
def backup(self, exp_list):
# update Q values
rewards = [exp[3] for exp in exp_list]
returns = self.discount(rewards, self.gamma)
for i, exp in enumerate(exp_list):
s, sa, s_1, r, d = exp
delta = returns[i] - self.Q[sa, s]
self.Q[sa, s] += self.lr * delta
self.exp_list = []
def get_policy(self):
return self.base_get_policy(self.Q)
def reset(self):
self.exp_list = []