forked from RomainLaroche/SPIBB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHCPI.py
316 lines (293 loc) · 12.3 KB
/
HCPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# authors: anonymized
import numpy as np
from modelTransitions import *
from scipy import stats
from math import log,sqrt
import spibb_utils
def policy_evaluation_modified(gamma, pi, r, p, theta = 0.000001):
"""
Evaluate pi using an initial v estimate and iterate
Args:
pi: policy, array of shape |S| x |A|
v: state values, array of shape |S|
r: rewards, array of shape |S| x |A|
p: state transition probabilities, array of shape |S| x |A| x |S|
Return:
v: 1D array with updated state values
"""
v=np.zeros((pi.shape[0],))
max_iteration = 10000 # avoid core meltdown
for i in range(max_iteration):
# Rewards according to policy: Hadamard product and row-wise sum
r_pi = np.einsum('ij,ij->i', pi, r)
# Policy-weighted transitions:
# multiply p by pi by broadcasting pi, then sum second axis
# result is an array of shape |S| x |S|
p_pi = np.einsum('ijk, ij->ik', p, pi)
# New values
v_new = r_pi + gamma * np.dot(p_pi, v)
# Stop condition
if np.max(np.absolute(v - v_new)) < theta:
v = v_new
break;
v = v_new
q = r + gamma * np.einsum('ijk, k->ij', p, v) # get q values
return v,q
class EstimatorReturn:
"""
Estimate the return of a target policy pi_t
given a set of trajectories generated by a baseline policy pi_b
The available estimator are :
- importance sampling;
- weighted importance sampling;
- doubly robust;
"""
def __init__(self,gamma, pi_b, pi_t, estimator_type, R, list_format = True,trajectories_reg=None):
self.estimator_dict = {"importance_sampling": self.importance_sampling,
"weighted_importance_sampling": self.weighted_importance_sampling,
"weighted_per_decision_IS": self.weighted_per_decision_importance_sampling,
"per_decision_IS": self.per_decision_importance_sampling,
"doubly_robust": self.doubly_robust
}
self.R = R
self.gamma=gamma
self.estimator_type = estimator_type
self.pi_b = pi_b
self.pi_t = pi_t
self.list_format = list_format
self.trajectories_reg = trajectories_reg
def __call__(self, trajectories):
return self.estimator_dict[self.estimator_type](trajectories)
def importance_sampling(self, trajectories):
if self.list_format:
estimate=[]
else:
estimate=0
for trajectory in trajectories:
t = 0
cum_rew = 0
weight = 1
for [action,state,_,reward] in trajectory:
cum_rew += self.gamma**t*reward
weight = weight*(self.pi_t[state,action]/self.pi_b[state,action])
t += 1
if self.list_format:
estimate.append(weight*cum_rew)
else:
estimate += weight*cum_rew
if self.list_format:
return estimate
else:
return estimate/len(trajectories)
def weighted_importance_sampling(self, trajectories):
if self.list_format:
estimate=[]
else:
estimate=0
sum_weight = 0
for trajectory in trajectories:
t = 0
cum_rew = 0
weight = 1
for [action, state, _, reward] in trajectory:
cum_rew += self.gamma ** t * reward
weight = weight * (self.pi_t[state, action] / self.pi_b[state, action])
t += 1
if self.list_format:
estimate.append(cum_rew)
else:
estimate += weight * cum_rew
sum_weight +=weight
if self.list_format:
return estimate
else:
return estimate / sum_weight
def per_decision_importance_sampling(self, trajectories,max_steps = 200):
if self.list_format:
estimate=[]
else:
estimate=0
rho = np.zeros((max_steps,len(trajectories)))
sum_per_decision = np.zeros((max_steps,1))
n = 0
for trajactory in trajectories:
weight = 1
t=0
for [action, state, _, _] in trajactory:
current_weight = self.pi_t[state, action] / self.pi_b[state, action]
weight = weight*current_weight
rho[t,n] = weight
t += 1
n += 1
n = 0
for trajectory in trajectories:
t = 0
cum_rew = 0
for [_, _, _, reward] in trajectory:
cum_rew += self.gamma ** t * reward * float(rho[t,n])
t += 1
if self.list_format:
estimate.append(cum_rew)
else:
estimate += cum_rew
n += 1
if self.list_format:
return estimate
else:
return float(estimate) /len(trajectories)
def weighted_per_decision_importance_sampling(self, trajectories,max_steps = 200):
if self.list_format:
estimate=[]
else:
estimate=0
rho = np.zeros((max_steps,len(trajectories)))
sum_per_decision = np.zeros((max_steps,1))
n = 0
for trajactory in trajectories:
weight = 1
t=0
for [action, state, _, _] in trajactory:
current_weight = self.pi_t[state, action] / self.pi_b[state, action]
weight = weight*current_weight
rho[t,n] = weight
sum_per_decision[t] += weight*self.gamma ** t
t += 1
n += 1
n = 0
for trajectory in trajectories:
t = 0
cum_rew = 0
for [_, _, _, reward] in trajectory:
cum_rew += self.gamma ** t * reward * float(rho[t,n])/float(sum_per_decision[t])
t += 1
if self.list_format:
estimate.append(cum_rew)
else:
estimate += cum_rew
n += 1
if self.list_format:
return estimate
else:
return float(estimate)/float(np.sum(sum_per_decision))
def doubly_robust(self, trajectories):
"""
As implemented in Jiang and Li, 2015;
Make use of a control variate build from an approximate
model of the MDP
:param trajectories: a set of trajectories
:return: an estimate of the return
"""
if self.list_format:
estimate = self.compute_estimare_dr(self.trajectories_reg, trajectories)
return estimate
else:
# We used the 2-fold DR as model fitting
index_mid = int(len(trajectories) / 2)
trajectories_reg = trajectories[:index_mid]
trajectories_eval = trajectories[index_mid:]
estimate1 = self.compute_estimare_dr(trajectories_reg,trajectories_eval,False)
estimate2 = self.compute_estimare_dr(trajectories_eval, trajectories_reg,False)
return (estimate1+estimate2)/2.
def compute_estimare_dr(self,trajectories_reg,trajectories_eval,is_list=True):
if is_list:
estimate=[]
else:
estimate = 0
[V_hat, Q_hat] = self.estimate_q(trajectories_reg)
for trajectory in trajectories_eval:
estimate_trajectory = 0
for [action, state, _, reward] in trajectory[::-1]:
estimate_trajectory = int(V_hat[state]) + \
self.pi_t[state, action] / self.pi_b[state, action]* (reward +
self.gamma*estimate_trajectory-int(Q_hat[state,action]))
if is_list:
estimate.append(estimate_trajectory)
else:
estimate += estimate_trajectory
if not(is_list):
estimate = estimate / len(trajectories_eval)
return estimate
def estimate_q(self, trajectories):
batch = []
for trajectory in trajectories:
for [action, state, next_state, reward] in trajectory:
batch.append([action, state, next_state, reward])
model = ModelTransitions(batch,self.pi_b.shape[0],self.pi_b.shape[1])
reward_model = spibb_utils.get_reward_model(model.transitions, self.R)
return policy_evaluation_modified(self.gamma, self.pi_t, reward_model, model.transitions)
class LowerBoundEstimator:
"""
Estimate the lower bound on the return;
The available strategies are (from HCPI Phillip Thomas):
- Concentration inequality;
- Student t-test;
- BCa (bootstrapping based method);
"""
def __init__(self, gamma, pi_b, pi_t, lower_bound_strategy,confidence,estimator_type,rho_min,rho_max,R,trajectories_reg=None):
self.strategy_dict = {"CI": self.confidence_interval_based,
"student_t_test": self.student_t_test,
"BCa": self.bootstrap_method
}
self.R = R
self.lower_bound_strategy = lower_bound_strategy
self.trajectories_reg = trajectories_reg
self.gamma = gamma
self.pi_b = pi_b
self.pi_t = pi_t
self.confidence = confidence
self.estimator_type = estimator_type
self.rho_min = rho_min
self.rho_max = rho_max
def __call__(self, trajectories):
return self.strategy_dict[self.lower_bound_strategy](trajectories)
def confidence_interval_based(self, trajectories):
"""
:param trajectories: a batch of trajectories
:return: a lower bound on the estimate return
"""
c = 0.5
estimator = EstimatorReturn(self.gamma, self.pi_b, self.pi_t, self.estimator_type, self.R, True, self.trajectories_reg)
list_estimates = estimator(trajectories)
list_estimates = self.normalize_return(list_estimates)
list_estimates_cut = [min(x,c) for x in list_estimates]
cross_substract = np.subtract.outer(np.square(list_estimates_cut), np.square(list_estimates_cut))
cross_substract_squared = np.square(cross_substract)
n = len(list_estimates)
lower_bound= (1./n)*np.sum(list_estimates_cut)- \
(7*c*log(2./self.confidence,2))/(3.*(n-1))-n/c*sqrt((log(2/self.confidence))*np.sum(np.sum(cross_substract_squared)))
return lower_bound*(self.rho_max-self.rho_min)+self.rho_min
def student_t_test(self, trajectories):
"""
Warning !! This method relies on the assumption that the return is normally distributed
:param trajectories: a batch of trajectories
:return: a lower bound on the estimate return
"""
estimator = EstimatorReturn(self.gamma, self.pi_b, self.pi_t, self.estimator_type, self.R, True, self.trajectories_reg)
list_estimates = estimator(trajectories)
list_estimates = self.normalize_return(list_estimates)
estimated_return = np.mean(list_estimates)
n = len(list_estimates)
sigma = np.sqrt(1./(n-1)*np.sum(np.square(np.array(list_estimates)-estimated_return)))
lower_bound = estimated_return-sigma/sqrt((n-1))*stats.t.ppf(1-self.confidence, n-1)
return lower_bound*(self.rho_max-self.rho_min)+self.rho_min
def bootstrap_method(self, trajectories):
"""
Warning ! This method is approximate and can be in practice time consuming
:param trajectories:
:return: an approximate lower bound on the estimate return
"""
n_bootstrap = 2000
list_estimated_return = []
n = len(trajectories)
list_indexes = np.arange(n)
# Repeat the process 2000 times as recommended in P.Thomas 2015
for time_repeat in range(n_bootstrap):
indexes_chosen = np.random.choice(list_indexes,n,True)
estimator = EstimatorReturn(self.gamma, self.pi_b, self.pi_t, self.estimator_type, self.R, False, self.trajectories_reg)
current_trajectories = [trajectories[i] for i in indexes_chosen]
list_estimated_return.append(estimator(current_trajectories))
index_to_consider = int(self.confidence*n_bootstrap)
list_estimated_return.sort()
return list_estimated_return[index_to_consider]
def normalize_return(self,list_estimate):
return [(x-self.rho_min)/(self.rho_max-self.rho_min) for x in list_estimate]