-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogic_template.py
229 lines (194 loc) · 12.5 KB
/
logic_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import numpy as np
import itertools
from generate_synthetic_data import Logic_Model_Generator
import torch.nn as nn
from torch.autograd import Variable
import torch
import torch.optim as optim
##################################################################
class Logic_Model(nn.Module):
# for example, consider three rules:
# A and B and Equal(A,B), and Before(A, D), then D;
# C and Before(C, Not D), then Not D
# D Then E, and Equal(D, E)
# note that define the temporal predicates as compact as possible
def __init__(self):
### the following parameters are used to manually define the logic rules
self.num_predicate = 5 # num_predicate is same as num_node
self.num_formula = 3
self.BEFORE = 'BEFORE'
self.EQUAL = 'EQUAL'
self.AFTER = 'AFTER'
self.Time_tolerance = 0.5
self.head_predicate_set = [3, 4] # the index set of all head predicates
self.integral_resolution = 0.03
self.decay_rate = 1
### the following parameters are used to generate synthetic data
### for the learning part, the following is used to claim variables
self.model_parameter = {}
head_predicate_idx = 3
self.model_parameter[head_predicate_idx] = {}
self.model_parameter[head_predicate_idx]['base'] = torch.autograd.Variable((torch.ones(1) * -0.2).double(), requires_grad=True)
formula_idx = 0
self.model_parameter[head_predicate_idx][formula_idx] = {}
self.model_parameter[head_predicate_idx][formula_idx]['weight'] = torch.autograd.Variable((torch.ones(1)* 0.01).double(), requires_grad=True)
formula_idx = 1
self.model_parameter[head_predicate_idx][formula_idx] = {}
self.model_parameter[head_predicate_idx][formula_idx]['weight'] = torch.autograd.Variable((torch.ones(1) * 0.01).double(), requires_grad=True)
head_predicate_idx = 4
self.model_parameter[head_predicate_idx] = {}
self.model_parameter[head_predicate_idx]['base'] = torch.autograd.Variable((torch.ones(1) * -0.0).double(), requires_grad=True)
formula_idx = 0
self.model_parameter[head_predicate_idx][formula_idx] = {}
self.model_parameter[head_predicate_idx][formula_idx]['weight'] = torch.autograd.Variable((torch.ones(1)*1).double(), requires_grad=True)
self.logic_template = self.logic_rule()
def logic_rule(self):
# encode rule information
logic_template = {}
head_predicate_idx = 3
logic_template[
head_predicate_idx] = {} # here 3 is the index of the head predicate; we could have multiple head predicates
formula_idx = 0
logic_template[head_predicate_idx][formula_idx] = {}
logic_template[head_predicate_idx][formula_idx]['body_predicate_idx'] = [0, 1]
logic_template[head_predicate_idx][formula_idx]['body_predicate_sign'] = [1, 1] # use 1 to indicate True; use -1 to indicate False
logic_template[head_predicate_idx][formula_idx]['head_predicate_sign'] = [1]
logic_template[head_predicate_idx][formula_idx]['temporal_relation_idx'] = [[0, 1], [0, 3]]
logic_template[head_predicate_idx][formula_idx]['temporal_relation_type'] = [self.EQUAL, self.BEFORE]
formula_idx = 1
logic_template[head_predicate_idx][formula_idx] = {}
logic_template[head_predicate_idx][formula_idx]['body_predicate_idx'] = [2]
logic_template[head_predicate_idx][formula_idx]['body_predicate_sign'] = [1] # use 1 to indicate True; use -1 to indicate False
logic_template[head_predicate_idx][formula_idx]['head_predicate_sign'] = [-1]
logic_template[head_predicate_idx][formula_idx]['temporal_relation_idx'] = [[2, 3]]
logic_template[head_predicate_idx][formula_idx]['temporal_relation_type'] = [self.BEFORE]
head_predicate_idx = 4
logic_template[head_predicate_idx] = {} # here 4 is the index of the head predicate; we could have multiple head predicates
formula_idx = 0
logic_template[head_predicate_idx][formula_idx] = {}
logic_template[head_predicate_idx][formula_idx]['body_predicate_idx'] = [3]
logic_template[head_predicate_idx][formula_idx]['body_predicate_sign'] = [1]
logic_template[head_predicate_idx][formula_idx]['head_predicate_sign'] = [1]
logic_template[head_predicate_idx][formula_idx]['temporal_relation_idx'] = [[3, 4]]
logic_template[head_predicate_idx][formula_idx]['temporal_relation_type'] = [self.EQUAL]
return logic_template
def intensity(self, cur_time, head_predicate_idx, history):
feature_formula = []
weight_formula = []
effect_formula = []
for formula_idx in list(self.logic_template[head_predicate_idx].keys()):
weight_formula.append(self.model_parameter[head_predicate_idx][formula_idx]['weight'])
feature_formula.append(self.get_feature(cur_time=cur_time, head_predicate_idx=head_predicate_idx,
history=history, template=self.logic_template[head_predicate_idx][formula_idx]))
effect_formula.append(self.get_formula_effect(cur_time=cur_time, head_predicate_idx=head_predicate_idx,
history=history, template=self.logic_template[head_predicate_idx][formula_idx]))
intensity = torch.exp(torch.cat(weight_formula, dim=0))/torch.sum(torch.exp(torch.cat(weight_formula, dim=0)), dim=0) * torch.cat(feature_formula, dim=0) * torch.cat(effect_formula, dim=0)
intensity = self.model_parameter[head_predicate_idx]['base'] + torch.sum(intensity)
intensity = torch.exp(intensity)
return intensity
def get_feature(self, cur_time, head_predicate_idx, history, template):
transition_time_dic = {}
feature = torch.tensor([0], dtype=torch.float64)
for idx, body_predicate_idx in enumerate(template['body_predicate_idx']):
transition_time = np.array(history[body_predicate_idx]['time'])
transition_state = np.array(history[body_predicate_idx]['state'])
mask = (transition_time <= cur_time) * (transition_state == template['body_predicate_sign'][idx])
transition_time_dic[body_predicate_idx] = transition_time[mask]
transition_time_dic[head_predicate_idx] = [cur_time]
### get weights
# compute features whenever any item of the transition_item_dic is nonempty
history_transition_len = [len(i) for i in transition_time_dic.values()]
if min(history_transition_len) > 0:
# need to compute feature using logic rules
time_combination = np.array(list(itertools.product(*transition_time_dic.values())))
time_combination_dic = {}
for i, idx in enumerate(list(transition_time_dic.keys())):
time_combination_dic[idx] = time_combination[:, i]
temporal_kernel = np.ones(len(time_combination))
for idx, temporal_relation_idx in enumerate(template['temporal_relation_idx']):
time_difference = time_combination_dic[temporal_relation_idx[0]] - time_combination_dic[temporal_relation_idx[1]]
if template['temporal_relation_type'][idx] == 'BEFORE':
temporal_kernel *= (time_difference < - self.Time_tolerance) * np.exp(-self.decay_rate *(cur_time - time_combination_dic[temporal_relation_idx[0]]))
if template['temporal_relation_type'][idx] == 'EQUAL':
temporal_kernel *= (abs(time_difference) <= self.Time_tolerance) * np.exp(-self.decay_rate*(cur_time - time_combination_dic[temporal_relation_idx[0]]))
if template['temporal_relation_type'][idx] == 'AFTER':
temporal_kernel *= (time_difference > self.Time_tolerance) * np.exp(-self.decay_rate*(cur_time - time_combination_dic[temporal_relation_idx[1]]))
feature = torch.tensor([np.sum(temporal_kernel)], dtype=torch.float64)
return feature
def get_formula_effect(self, cur_time, head_predicate_idx, history, template):
## Note this part is very important!! For generator, this should be np.sum(cur_time > head_transition_time) - 1
## Since at the transition times, choose the intensity function right before the transition time
head_transition_time = np.array(history[head_predicate_idx]['time'])
head_transition_state = np.array(history[head_predicate_idx]['state'])
if len(head_transition_time) == 0:
cur_state = 0
counter_state = 1 - cur_state
else:
idx = np.sum(cur_time > head_transition_time) - 1
cur_state = head_transition_state[idx]
counter_state = 1 - cur_state
if counter_state == template['head_predicate_sign']:
formula_effect = torch.tensor([1], dtype=torch.float64)
else:
formula_effect = torch.tensor([-1], dtype=torch.float64)
return formula_effect
def log_likelihood(self, dataset, sample_ID_batch, T_max):
log_likelihood = torch.tensor([0], dtype=torch.float64)
# iterate over samples
for sample_ID in sample_ID_batch:
# iterate over head predicates; each predicate corresponds to one intensity
data_sample = dataset[sample_ID]
for head_predicate_idx in self.head_predicate_set:
intensity_log_sum = self.intensity_log_sum(head_predicate_idx, data_sample)
intensity_integral = self.intensity_integral(head_predicate_idx, data_sample, T_max)
log_likelihood += (intensity_log_sum - intensity_integral)
return log_likelihood
def intensity_log_sum(self, head_predicate_idx, data_sample):
intensity_transition = []
for t in data_sample[head_predicate_idx]['time'][1:]:
cur_intensity = self.intensity(t, head_predicate_idx, data_sample)
intensity_transition.append(cur_intensity)
if len(intensity_transition) == 0: # only survival term, no event happens
log_sum = torch.tensor([0], dtype=torch.float64)
else:
log_sum = torch.sum(torch.log(torch.cat(intensity_transition, dim=0)))
return log_sum
def intensity_integral(self, head_predicate_idx, data_sample, T_max):
start_time = 0
end_time = T_max
intensity_grid = []
for t in np.arange(start_time, end_time, self.integral_resolution):
cur_intensity = self.intensity(t, head_predicate_idx, data_sample)
intensity_grid.append(cur_intensity)
integral = torch.sum(torch.cat(intensity_grid, dim=0) * self.integral_resolution)
return integral
### the following functions are for optimization
def optimize_log_likelihood(self, dataset, sample_ID_batch, T_max, optimizer):
optimizer.zero_grad() # set gradient zero at the start of a new mini-batch
loss = -self.log_likelihood(dataset, sample_ID_batch, T_max)
loss.backward()
optimizer.step()
return loss
if __name__ == "__main__":
logic_model_generator = Logic_Model_Generator()
num_samples = 5000
time_horizon = 10
batch_size = 20
num_batch = num_samples / batch_size
data = np.load('data.npy', allow_pickle='TRUE').item()
logic_model = Logic_Model()
optimizer = optim.Adam([logic_model.model_parameter[3]['base'],
logic_model.model_parameter[3][0]['weight'],
logic_model.model_parameter[3][1]['weight'],
logic_model.model_parameter[4]['base'],
logic_model.model_parameter[4][0]['weight']], lr=0.03)
for iter in range(10000):
for batch_idx in np.arange(0, num_batch, 1):
indices = np.arange(batch_idx*batch_size, (batch_idx+1)*batch_size, 1)
loss = logic_model.optimize_log_likelihood(data, indices, time_horizon, optimizer)
print('loss is', loss)
print('base 1 of logic 1 is', logic_model.model_parameter[3]['base'])
print('weight 1 of logic 1 is', torch.exp(logic_model.model_parameter[3][0]['weight'])/(torch.exp(logic_model.model_parameter[3][0]['weight']) +torch.exp(logic_model.model_parameter[3][1]['weight'])))
print('weight 2 of logic 1 is', torch.exp(logic_model.model_parameter[3][1]['weight'])/(torch.exp(logic_model.model_parameter[3][0]['weight']) +torch.exp(logic_model.model_parameter[3][1]['weight'])))
print('base 2 is', logic_model.model_parameter[4]['base'])
print('weight 3 is', torch.exp(logic_model.model_parameter[4][0]['weight'])/torch.exp(logic_model.model_parameter[4][0]['weight']))