-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnew_neoclassical_human_capital_matern.py
235 lines (189 loc) · 9.03 KB
/
new_neoclassical_human_capital_matern.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import jax
import jax.numpy as jnp
import numpy as np
import pyomo.environ as pyo
from pyomo.opt import TerminationCondition
import jsonargparse
from jax import config
from kernels import integrated_matern_kernel_matrices
from typing import List, Optional
config.update("jax_enable_x64", True)
from scipy.optimize import fsolve
def human_capital_matern(
a_k: float = 1 / 3,
a_h: float = 1 / 4,
delta_k: float = 0.1,
delta_h: float = 0.05,
rho_hat: float = 0.11, #discount rate
k_0: float = 1.5, #3.0024724187979452, #1.5,
nu: float = 0.5,
sigma: float = 1.0,
rho: float = 10,
solver_type: str = "ipopt",
train_T: float = 80.0,
train_points: int = 81,
test_T: float = 100.0,
test_points: int = 100,
benchmark_T: float = 60.0,
benchmark_points: int = 300,
train_points_list: Optional[List[float]] = None,
verbose: bool = False,
):
# if passing in `train_points` then doesn't us a grid. Otherwise, uses linspace
if train_points_list is None:
train_data = jnp.linspace(0, train_T, train_points)
else:
train_data = jnp.array(train_points_list)
test_data = jnp.linspace(0, test_T, test_points)
benchmark_grid = jnp.linspace(0, benchmark_T, benchmark_points)
# Construct kernel matrices
N = len(train_data)
K, K_tilde = integrated_matern_kernel_matrices(
train_data, train_data, nu, sigma, rho
)
K = np.array(K) # pyomo doesn't support jax arrays
K_tilde = np.array(K_tilde)
# Production function
def f(k, h):
return (k**a_k) * (h**a_h)
def f_k(k, h):
return (a_k * k**(a_k - 1)) * (h**a_h)
def f_h(k, h):
return (k**a_k) * (a_h * h**(a_h - 1))
def no_arbitrage_constraint(h):
return f_h(k_0, h) - f_k(k_0, h) - delta_h + delta_k
initial_guess = [k_0]
result = fsolve(no_arbitrage_constraint, initial_guess)
h_0 = result[0]
# Create pyomo model and variables
m = pyo.ConcreteModel()
m.I = range(N)
# 8 variables
m.alpha_k = pyo.Var(m.I, within=pyo.Reals, initialize = 0.0) # coeffs for x_k(t): we call it in the code k(t)
m.alpha_h = pyo.Var(m.I, within=pyo.Reals, initialize = 0.0) # coeffs for x_h(t): we call it in the code h(t)
m.alpha_mu_k = pyo.Var(m.I, within=pyo.Reals, initialize = 0.0) # coeffs for \mu_k(t)
m.alpha_mu_h = pyo.Var(m.I, within=pyo.Reals, initialize = 0.0) # coeffs for \mu_h(t)
m.alpha_i_k = pyo.Var(m.I, within=pyo.Reals, initialize = 0.0) # coeff for y_k(t): we call it i_k(t) in the code
m.alpha_i_h = pyo.Var(m.I, within=pyo.Reals, initialize = 0.0) # coeff for y_h(t): we call it i_h(t) in the code
m.alpha_c = pyo.Var(m.I, within=pyo.Reals, initialize = 0.0) # coeff for for y_c(t): we call it c(t) in the code
#initializations of variables at 0 for 6 variables: h_0 and k_0 are given to us from the economic setup
m.i_k_0 = pyo.Var(within=pyo.NonNegativeReals, initialize=delta_k * k_0) #y_k(0)
m.i_h_0 = pyo.Var(within=pyo.NonNegativeReals, initialize=delta_h * h_0)#y_h(0)
m.c_0 = pyo.Var(within=pyo.NonNegativeReals, initialize = f(k_0, h_0) - delta_h * h_0 - delta_k * k_0) #y_c(0)
m.mu_k_0 = pyo.Var(within=pyo.NonNegativeReals, initialize = 1/(f(k_0, h_0) - delta_h * h_0 - delta_k * k_0)) #\mu(0) = 1/c(0)
m.mu_h_0 = pyo.Var(within=pyo.NonNegativeReals, initialize = 1/(f(k_0, h_0) - delta_h * h_0 - delta_k * k_0)) #\mu(0) = 1/c(0)
# defining the integrated kernel functional approximation
def k(m, i): #x_k(t): physical capital k(t)
return k_0 + sum(K_tilde[i, j] * m.alpha_k[j] for j in m.I)
def h(m, i): #x_h(t): human capital h(t)
return h_0 + sum(K_tilde[i, j] * m.alpha_h[j] for j in m.I)
def i_k(m, i): # y_k(t): investment in physical capital i_k(t)
return m.i_k_0 + sum(K_tilde[i, j] * m.alpha_i_k[j] for j in m.I)
def i_h(m, i): # y_h(t): investment in human capital i_h(t)
return m.i_h_0 + sum(K_tilde[i, j] * m.alpha_i_h[j] for j in m.I)
def c(m, i): #y_c(t): consumption, c(t)
return m.c_0 + sum(K_tilde[i, j] * m.alpha_c[j] for j in m.I)
def mu_k(m, i): #\mu(t): co-state variable
return m.mu_k_0 + sum(K_tilde[i, j] * m.alpha_mu_k[j] for j in m.I)
def mu_h(m, i): #the transversality variable for physical capital
return m.mu_h_0 + sum(K_tilde[i, j] * m.alpha_mu_h[j] for j in m.I)
# defining the derivatives of the variables
def dk_dt(m, i): # \dot{x}_k(t) : derivative of the physical capital
return sum(K[i, j] * m.alpha_k[j] for j in m.I)
def dh_dt(m, i): # \dot{x}_h(t) : derivative of the human capital
return sum(K[i, j] * m.alpha_h[j] for j in m.I)
def dmu_k_dt(m, i):
return sum(K[i, j] * m.alpha_mu_k[j] for j in m.I)
def dmu_h_dt(m, i):
return sum(K[i, j] * m.alpha_mu_h[j] for j in m.I)
# defining constraints and objective for model and solve
@m.Constraint(m.I) # for each index in m.I
def dk_dt_constraint(m, i):
return dk_dt(m, i) == i_k(m, i) - delta_k * k(m, i)
@m.Constraint(m.I) # for each index in m.I
def dh_dt_constraint(m, i):
return dh_dt(m, i) == i_h(m, i) - delta_h * h(m, i)
@m.Constraint(m.I) # for each index in m.I
def dmu_k_dt_constraint(m, i):
return dmu_k_dt(m, i) == -mu_k(m, i) * (f_k(k(m, i), h(m, i)) - delta_k - rho_hat)
@m.Constraint(m.I) # for each index in m.I
def dmu_h_dt_constraint(m, i):
return dmu_h_dt(m, i) == -mu_h(m, i) * (f_h(k(m, i), h(m, i)) - delta_h - rho_hat)
@m.Constraint(m.I) # for each index in m.I
def feasibility(m, i):
return 0.0 == c(m, i) + i_h(m, i) + i_k(m, i) - f(k(m, i), h(m, i))
@m.Constraint(m.I) # for each index in m.I
def shadow_price(m, i):
return mu_k(m, i) * c(m, i) -1.0 == 0.0
@m.Constraint(m.I) # for each index in m.I
def b_h_constraint(m, i):
return mu_k(m, i) - mu_h(m, i) == 0.0
@m.Objective(sense=pyo.minimize)
def min_norm(m): # alpha @ K @ alpha not supported by pyomo
return sum(K[i, j] * m.alpha_mu_k[i] * m.alpha_mu_k[j] for i in m.I for j in m.I) + sum(K[i, j] * m.alpha_k[i] * m.alpha_k[j] for i in m.I for j in m.I)+ sum(K[i, j] * m.alpha_mu_h[i] * m.alpha_mu_h[j] for i in m.I for j in m.I)+sum(K[i, j] * m.alpha_h[i] * m.alpha_h[j] for i in m.I for j in m.I)
solver = pyo.SolverFactory(solver_type)
options = (
{
'bound_relax_factor': 0,
}
) # can add options here. See https://coin-or.github.io/Ipopt/OPTIONS.html#OPTIONS_AMPL
results = solver.solve(m, tee=verbose, options=options)
if not results.solver.termination_condition == TerminationCondition.optimal:
print(str(results.solver)) # raise exception?
alpha_c = jnp.array([pyo.value(m.alpha_c[i]) for i in m.I])
alpha_k = jnp.array([pyo.value(m.alpha_k[i]) for i in m.I])
alpha_h = jnp.array([pyo.value(m.alpha_h[i]) for i in m.I])
alpha_i_k = jnp.array([pyo.value(m.alpha_i_k[i]) for i in m.I])
alpha_i_h = jnp.array([pyo.value(m.alpha_i_h[i]) for i in m.I])
alpha_mu_k = jnp.array([pyo.value(m.alpha_mu_k[i]) for i in m.I])
alpha_mu_h = jnp.array([pyo.value(m.alpha_mu_h[i]) for i in m.I])
c_0 = pyo.value(m.c_0)
i_k_0 = pyo.value(m.i_k_0)
i_h_0 = pyo.value(m.i_h_0)
mu_k_0 = pyo.value(m.mu_k_0)
mu_h_0 = pyo.value(m.mu_h_0)
# Interpolator using training data
#@jax.jit
def kernel_solution(test_data):
# pointwise comparison test_data to train_data
K_test, K_tilde_test = integrated_matern_kernel_matrices(
test_data, train_data, nu, sigma, rho
)
c_test = c_0 + K_tilde_test @ alpha_c
k_test = k_0 + K_tilde_test @ alpha_k
h_test = h_0 + K_tilde_test @ alpha_h
i_k_test = i_k_0 + K_tilde_test @ alpha_i_k
i_h_test = i_h_0 + K_tilde_test @ alpha_i_h
mu_k_test = mu_k_0 + K_tilde_test @ alpha_mu_k
mu_h_test = mu_h_0 + K_tilde_test @ alpha_mu_h
feasibility_test = c_test + i_h_test + i_k_test - f(k_test, h_test)
return k_test, h_test, c_test, i_k_test, i_h_test, mu_k_test, mu_h_test, feasibility_test
# Generate test_data
k_test, h_test, c_test, i_k_test, i_h_test, mu_k_test, mu_h_test, feasibility_test = kernel_solution(test_data)
print(
f"solve_time(s) = {results.solver.Time}"
)
return {
"t_train": train_data,
"t_test": test_data,
"k_test": k_test,
"h_test": h_test,
"c_test": c_test,
"i_k_test": i_k_test,
"i_h_test": i_h_test,
"mu_k_test": mu_k_test,
"mu_h_test": mu_h_test,
"feasibility_test": feasibility_test,
"alpha_c": alpha_c,
"alpha_k": alpha_k,
"alpha_h": alpha_h,
"alpha_i_k": alpha_i_k,
"alpha_i_h": alpha_i_h,
"c_0": c_0,
"i_k_0": i_k_0,
"i_h_0": i_h_0,
"solve_time": results.solver.Time,
"kernel_solution": kernel_solution, # interpolator
}
if __name__ == "__main__":
jsonargparse.CLI(human_capital_matern)