forked from chrisconlan/algorithmic-trading-with-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathoptimization.py
235 lines (174 loc) · 7.8 KB
/
optimization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
from pypm import metrics, signals, data_io, simulation
import pandas as pd
import numpy as np
from collections import defaultdict, OrderedDict
from itertools import product
from timeit import default_timer
from typing import Dict, Tuple, List, Callable, Iterable, Any, NewType, Mapping
import matplotlib.pyplot as plt
from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D
# Performance data and parameter inputs are dictionaries
Parameters = NewType('Parameters', Dict[str, float])
Performance = simulation.PortfolioHistory.PerformancePayload # Dict[str, float]
# Simulation function must take parameters as keyword arguments pointing to
# iterables and return a performance metric dictionary
SimKwargs = NewType('Kwargs', Mapping[str, Iterable[Any]])
SimFunction = NewType('SimFunction', Callable[[SimKwargs], Performance])
class OptimizationResult(object):
"""Simple container class for optimization data"""
def __init__(self, parameters: Parameters, performance: Performance):
# Make sure no collisions between performance metrics and params
assert len(parameters.keys() & performance.keys()) == 0, \
'parameter name matches performance metric name'
self.parameters = parameters
self.performance = performance
@property
def as_dict(self) -> Dict[str, float]:
"""Combines the dictionaries after we are sure of no collisions"""
return {**self.parameters, **self.performance}
class GridSearchOptimizer(object):
"""
A generic grid search optimizer that requires only a simulation function and
a series of parameter ranges. Provides timing, summary, and plotting
utilities with return data.
"""
def __init__(self, simulation_function: SimFunction):
self.simulate = simulation_function
self._results_list: List[OptimizationResult] = list()
self._results_df = pd.DataFrame()
self._optimization_finished = False
def add_results(self, parameters: Parameters, performance: Performance):
_results = OptimizationResult(parameters, performance)
self._results_list.append(_results)
def optimize(self, **optimization_ranges: SimKwargs):
assert optimization_ranges, 'Must provide non-empty parameters.'
# Convert all iterables to lists
param_ranges = {k: list(v) for k, v in optimization_ranges.items()}
self.param_names = param_names = list(param_ranges.keys())
# Count total simulation
n = total_simulations = np.prod([len(r) for r in param_ranges.values()])
total_time_elapsed = 0
print(f'Starting simulation ...')
print(f'Simulating 1 / {n} ...', end='\r')
for i, params in enumerate(product(*param_ranges.values())):
if i > 0:
_avg = avg_time = total_time_elapsed / i
_rem = remaining_time = (n - (i + 1)) * avg_time
s = f'Simulating {i+1} / {n} ... '
s += f'{_rem:.0f}s remaining ({_avg:.1f}s avg)'
s += ' '*8
print(s, end='\r')
timer_start = default_timer()
parameters = {n: param for n, param in zip(param_names, params)}
results = self.simulate(**parameters)
self.add_results(parameters, results)
timer_end = default_timer()
total_time_elapsed += timer_end - timer_start
print(f'Simulated {total_simulations} / {total_simulations} ...')
print(f'Elapsed time: {total_time_elapsed:.0f}s')
print(f'Done.')
self._optimization_finished = True
def _assert_finished(self):
assert self._optimization_finished, \
'Run self.optimize before accessing this method.'
@property
def results(self) -> pd.DataFrame:
self._assert_finished()
if self._results_df.empty:
_results_list = self._results_list
self._results_df = pd.DataFrame([r.as_dict for r in _results_list])
_columns = set(list(self._results_df.columns.values))
_params = set(self.param_names)
self.metric_names = list(_columns - _params)
return self._results_df
def print_summary(self):
df = self.results
metric_names = self.metric_names
print('Summary statistics')
print(df[metric_names].describe().T)
def get_best(self, metric_name: str) -> pd.DataFrame:
"""
Sort the results by a specific performance metric
"""
self._assert_finished()
results = self.results
param_names = self.param_names
metric_names = self.metric_names
assert metric_name in metric_names, 'Not a performance metric'
partial_df = self.results[param_names+[metric_name]]
return partial_df.sort_values(metric_name, ascending=False)
def plot_1d_hist(self, x, show=True):
self.results.hist(x)
if show:
plt.show()
def plot_2d_line(self, x, y, show=True, **filter_kwargs):
_results = self.results
for k, v in filter_kwargs.items():
_results = _results[getattr(_results, k) == v]
ax = _results.plot(x, y)
if filter_kwargs:
k_str = ', '.join([f'{k}={v}' for k,v in filter_kwargs.items()])
ax.legend([f'{x} ({k_str})'])
if show:
plt.show()
def plot_2d_violin(self, x, y, show=True):
"""
Group y along x then plot violin charts
"""
x_values = self.results[x].unique()
x_values.sort()
y_by_x = OrderedDict([(v, []) for v in x_values])
for _, row in self.results.iterrows():
y_by_x[row[x]].append(row[y])
fig, ax = plt.subplots()
ax.violinplot(dataset=list(y_by_x.values()), showmedians=True)
ax.set_xlabel(x)
ax.set_ylabel(y)
ax.set_xticks(range(0, len(y_by_x)+1))
ax.set_xticklabels([''] + list(y_by_x.keys()))
if show:
plt.show()
def plot_3d_mesh(self, x, y, z, show=True, **filter_kwargs):
"""
Plot interactive 3d mesh. z axis should typically be performance metric
"""
_results = self.results
fig = plt.figure()
ax = Axes3D(fig)
for k, v in filter_kwargs.items():
_results = _results[getattr(_results, k) == v]
X, Y, Z = [getattr(_results, attr) for attr in (x, y, z)]
ax.plot_trisurf(X, Y, Z, cmap=cm.jet, linewidth=0.2)
ax.set_xlabel(x)
ax.set_ylabel(y)
ax.set_zlabel(z)
if show:
plt.show()
def plot(self, *attrs: Tuple[str], show=True,
**filter_kwargs: Dict[str, Any]):
"""
Attempt to intelligently dispatch plotting functions based on the number
and type of attributes. Last argument should typically be the
performance metric.
"""
self._assert_finished()
param_names = self.param_names
metric_names = self.metric_names
if len(attrs) == 3:
assert attrs[0] in param_names and attrs[1] in param_names, \
'First two positional arguments must be parameter names.'
assert attrs[2] in metric_names, \
'Last positional argument must be a metric name.'
assert len(filter_kwargs) + 2 == len(param_names), \
'Must filter remaining parameters. e.g. p_three=some_number.'
self.plot_3d_mesh(*attrs, show=show, **filter_kwargs)
elif len(attrs) == 2:
if len(param_names) == 1 or filter_kwargs:
self.plot_2d_line(*attrs, show=show, **filter_kwargs)
elif len(param_names) > 1:
self.plot_2d_violin(*attrs, show=show)
elif len(attrs) == 1:
self.plot_1d_hist(*attrs, show=show)
else:
raise ValueError('Must pass between one and three column names.')