-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathrace_detection_main.py
312 lines (288 loc) · 15.5 KB
/
race_detection_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
'''
race_detection_main.py
Main Python script to run Race Detection.
This includes:
(1) detecting races
(2) producing race-level statistics.
Reference:
Code and Data Appendix for “Quantifying the High-Frequency Trading ‘Arms Race’“
by Matteo Aquilina, Eric Budish and Peter O’Neill.
Please follow Section 7 of the Code and Data Appendix to run this script.
Environment: MacOS / Linux;
Our code does not support the Windows operating system.
We recommend Windows 10 users set up Windows Subsystem for Linux and
run the code in the virtual Linux system.
Dependency: Python (>=3.6), Pandas (>=1.1.0), Numpy (>=1.18.0)
Instructions:
0. Pre-process the data and run message_data_processing_main.py:
Please follow Section 3 of the Code and Data Appendix for data pre-processing,
Section 5 of the Code and Data Appendix for computing environment setup,
and Section 6 of the Code and Data Appendix to run message_data_processing_main.py.
1. Set Parameters:
1.1 Set Paths: Create the following directories and set the paths
accordingly in the "Set path" section in this script below.
(1) file_ticktables = 'path/to/Ticktables.pkl' and
file_symdates = 'path/to/All_SymDates.csv.gz':
Paths to the reference data files.
Please refer to Section 3.4 of the Code and Data Appendix
for the preparation of the reference data files,
and Section 5.2 of the Code and Data Appendix
for the file structure setup.
This MUST be the same file as in process_msg_data_main.py.
(2) path_logs = 'path/to/Logs/':
Directory for log files. Everytime the user runs the script, the
program automatically creates a subfolder inside /Logs for
the log files generated in that run. Log files can be
used for debugging and progress monitoring.
WARNING: Do NOT remove files from this directory before
the analysis is finished.
It is recommended to use the same folder for log files as in
process_msg_data_main.py.
(3) path_temp = 'path/to/Temp/':
Directory for intermediate files. This includes the event
classification and order book reconstruction results.
This MUST be the same folder as in process_msg_data_main.py.
WARNING: Do NOT remove files from this directory before
the analysis is finished.
(4) path_output = 'path/to/Output':
Directory for the output files (race-level stats).
The program will aggregate the race-level
statistics and output the dataset to this directory.
Please refer to Section 8 of the Code and Data Appendix
for a complete list of output statistics.
It is recommended to use the same folder for output files as in
process_msg_data_main.py.
1.2 Set Technical Parameters: Set the following technical parameters
in the section "Specify Technical Parameters" in this script below.
Please refer to Section 7.1 of the Code and Data Appendix.
(1) num_workers: The number of cores to use for multi-processing.
(2) max_dec_scale: The max decimal scale of price-related variables.
This parameter is used to avoid floating point errors.
This parameter MUST have the same value as in process_msg_data_main.py.
1.3 Set Race Detection Parameters: Construct a .csv file with the race
detection parameters for each race run to explore. Each column of the
.csv file should represent a race parameter in Table 2 of the Code and Data Appendix
with the variable names as the column headers. Each row should represent
a race run to be explored with the parameters for the run specified in each column.
Please refer to Section 4 of the Code and Data Appendix.
2. Execute the Code: Please follow Section 7.2 of the Code and Data Appendix.
We recommend users to run the code on a small sample before running on a full sample.
The output files of the Python program will be saved to path_output.
'''
###################################################################################
################################## IMPORT MODULES #################################
###################################################################################
import multiprocessing
import datetime
import pprint
import traceback
import os
import pandas as pd
import random
from LatencyArbitrageAnalysis.Race_Detection_and_Statistics import detect_races_and_race_statistics
from LatencyArbitrageAnalysis.utils.Logger import getLogger
from LatencyArbitrageAnalysis.utils.Monitor_Logs import MonitorLogs
from LatencyArbitrageAnalysis.utils.Collect_Statistics import collect_stats
from LatencyArbitrageAnalysis.utils.Dtypes import dtypes_msgs, dtypes_top
###################################################################################
################################## SET PARAMETERS #################################
###################################################################################
# This section allows users to specify paths, # cores, max decimal scale of
# the price-related variables.
###################################################################################
###### Set Paths
### File Path to Ticktables.pkl
# This MUST be the same file in message_data_processing_main.py.
# tick tables - dict of {(date, sym): ticktable}
file_ticktables = '/path/to/Ticktables.pkl'
### File Path to All_SymDates.csv.gz
# This MUST be the same file in message_data_processing_main.py.
# A .csv file containing a list of all symbol-dates in the data.
# This file should have two columns,
# 'Symbol' for the symbol, and
# 'Date' for the date.
# E.g.,
# | Date |Symbol|
# ---------------------
# | 2000-01-01 | ABCD |
# | 2000-01-02 | ABCD |
# | 2000-01-01 | EFGH |
# | 2000-01-02 | EFGH |
file_symdates = '/path/to/All_SymDates.csv.gz'
### Path to log file directory
path_logs = '/path/to/Logs/'
### Path to intermediate file directory
# Note: This MUST be the same path_temp in message_data_processing_main.py
# because race_detection_main.py makes use of the intermediate
# output files generated by message_data_processing_main.py saved in path_temp.
path_temp = '/path/to/Temp/'
### Path to the output file directory
path_output = '/path/to/Output/'
###################################################################################
###### Specify Technical Parameters
###### Multi-Processing Cores to Use
num_workers = 1
###### Max Decimal Scale of the Price-Related Variables
# For example, if the prices are quoted as 12.34560 with at most 5 digits
# after the decimal point, then max_dec_scale = 5
# This should be the scale of the smallest tick size in the data.
# This MUST have the same value in message_data_processing_main.py.
max_dec_scale = 5
###################################################################################
###### Set Race Detection Parameters
### File Path to Race Detection Parameter .csv file
file_race_params = '/path/to/Input_Race_Parameters.csv'
###################################################################################
###################################################################################
# Warning: Please do not modify the script after this line unless
# you fully understand the code and know what you are doing
###################################################################################
###################################################################################
###################################################################################
################################### MAIN PROGRAM ##################################
###################################################################################
# Define the main function for race detection
def main(runtime, date, sym, args, paths):
# Initialize logger
logpath = '%s/%s/' %(paths['path_logs'], 'RaceDetection_'+runtime)
logfile = 'Process_Msg_Data_Main_Log_%s_%s_%s.log' % (runtime, date, sym)
logger = getLogger(logpath, logfile, __name__)
timer_st, pid = datetime.datetime.now(), os.getpid()
logger.info('Processing: %s %s' % (date, sym))
logger.info('Timer Start: %s' % (str(timer_st)))
logger.info('Process ID: %s' % (pid))
# Race Detection
logger.info('Launching race detection and statistics...')
run_process(logger, detect_races_and_race_statistics, runtime, date, sym, args, paths)
timer_end = datetime.datetime.now()
logger.info('Complete.')
logger.info('Timer End: %s' % str(timer_end))
logger.info('Time Elapsed: %s' % str(timer_end - timer_st))
ans = [date, sym, pid, timer_st, timer_end]
return (ans)
# Define the process runner
def run_process(logger, process, runtime, date, sym, args, paths):
try:
process(runtime, date, sym, args, paths)
except (SystemExit, KeyboardInterrupt):
logger.error('Failed. SystemExit or KeyboardInterrupt')
return None
except Exception as e:
logger.error('Failed. Error: %s' % e)
logger.error(traceback.format_exc())
return None
# Define the multiprocessing wrapper
def multi_process_wrapper(args):
return main(*args)
# Define a helper function for file structure
def create_folder_structure(pairs, temp_path_list):
for path in temp_path_list:
if not os.path.exists(path):
os.makedirs(path)
dates = set([date for date, sym in pairs])
for date in dates:
if not os.path.exists('%s/%s/' % (path, date)):
os.makedirs('%s/%s/' % (path, date))
###################################################################################
#################################### EXECUTION ####################################
###################################################################################
if __name__ == '__main__':
# Read in Race Detection Parameters
race_params = pd.read_csv(file_race_params)
num_remaining_spec = race_params.shape[0]
race_params = race_params.to_dict('row')
# Loop through each race run
for race_param in race_params:
num_remaining_spec = num_remaining_spec - 1
# Get runtime information
now = datetime.datetime.now()
runtime = '%02d%02d%02d_%02d%02d%02d' % (now.year, now.month, now.day, now.hour, now.minute, now.second)
print('The runtime is: ' + runtime)
# Set up the log folder
logpath = '%s/%s/' %(path_logs, 'RaceDetection_'+runtime)
if not os.path.exists(logpath):
os.makedirs(logpath)
else:
print('Warning: runtime %s already exists. Log files will be overwritten!' % runtime)
###################################################################################
# Organize input parameters
###### Read in Reference Data
## Read in the symbol-date pairs
pairs = pd.read_csv(file_symdates, dtype={'Date':'O','Symbol':'O'})[['Date','Symbol']].dropna().to_records(index=False).tolist()
# Shuffle the pairs for parallel processing
random.shuffle(pairs)
## Read in the tick table reference data
ticktables = pd.read_pickle(file_ticktables)
###### Construct Input
## price unit to convert price-related variables into integers by multiplication.
price_factor = int(10 ** (max_dec_scale+1))
## Construct paths
paths = {
'path_logs': path_logs,
'path_temp': path_temp,
'path_output': path_output}
## Write args and paths to a txt file for retaining parameters of runs
technical_arg_log = '%s/%s/' %(path_logs, 'TechnicalSpecifications')
if not os.path.exists(technical_arg_log):
os.makedirs(technical_arg_log)
with open(technical_arg_log+'%s.txt' % runtime, 'w') as f:
print('Running: Race Detection', file=f)
print('Arguments specified for runtime %s:' % runtime, file=f)
pprint.pprint({
'max_dec_scale': max_dec_scale,
'race_param':race_param}, f)
print('Paths specified for runtime %s:' % runtime, file=f)
pprint.pprint(paths, f)
###################################################################################
# Organize args
# Create a list of args to be passed into the multi-processing pool. Each item is
# the args for one symbol-date.
# The reason for this is that ticktables may vary across symbol-dates.
args_list = [(runtime, date, sym,
{
'dtypes_msgs': dtypes_msgs,
'dtypes_top': dtypes_top,
'price_factor': price_factor,
'race_param': race_param,
'ticktable': ticktables[(date,sym)],
},
paths) for date, sym in pairs]
###################################################################################
# Create folder structure within the temp dir if it doesn't exist
temp_path_list = ['%s/ClassifiedMsgData/' % paths['path_temp'], '%s/BBOData/' % paths['path_temp'],
'%s/DepthData/' % paths['path_temp'], '%s/RaceRecords/' % paths['path_temp'],
'%s/RaceStats/' % paths['path_temp'], '%s/SymDateStats/' % paths['path_temp'],
'%s/TradeStats/' % paths['path_temp']]
create_folder_structure(pairs, temp_path_list)
###################################################################################
# Run the main program
# start multi-processing
time_st = datetime.datetime.now()
print('#######################################################')
print('Start Race Detection: %s' % str(time_st))
print('runtime: %s' % runtime)
print('Number of remaining specifications in this run: %s' % num_remaining_spec)
print('Race Parameters:')
pprint.pprint(race_param)
pool = multiprocessing.Pool(num_workers)
results = pool.map(multi_process_wrapper, args_list)
print('Finished Detecting Races: %s' % str(datetime.datetime.now() - time_st))
###################################################################################
# Monitor logs to check if all sym-dates are finished.
# Logs will be printed out in the function call.
logs = MonitorLogs(runtime, pairs, paths)
###################################################################################
# Combine stats files
print('#################################')
time_st = datetime.datetime.now()
print('Start Collecting Race Statistics')
# Race Level Statistics
if logs[(logs['Status_Race_Detection_Statistics'] == 'Done')].shape[0] == len(pairs):
# Generate a csv file to record the race parameters of a run
pd.DataFrame.from_dict(race_param, orient = 'index' ,columns=['Value']).to_csv('%s/Parameters_%s.csv' % (paths['path_output'], runtime))
# Collect stats
collect = 'Race_Stats'
collect_stats(runtime, paths, pairs, collect)
print('Runtime %s Finished Collecting Race Statistics: %s' % (runtime, str(datetime.datetime.now() - time_st)))
else:
print('Runtime %s Did not complete Race Stats collection, use MonitorLogs to check for reasons' % (runtime))