-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathStatusParser.py
407 lines (353 loc) · 15.2 KB
/
StatusParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
import json
import os
import time
from datetime import datetime, timedelta
import queue
from sys import platform
import threading
from time import sleep
from EDAP_data import *
from EDlogger import logger
from Voice import Voice
from WindowsKnownPaths import *
class StatusParser:
""" Parses the Status.json file generated by the game.
Thanks go to Rude. See source at 'https://github.com/RatherRude/Elite-Dangerous-AI-Integration'."""
def __init__(self, file_path=None):
if platform != "win32":
self.file_path = file_path if file_path else "./linux_ed/Status.json"
else:
from WindowsKnownPaths import get_path, FOLDERID, UserHandle
self.file_path = file_path if file_path else (get_path(FOLDERID.SavedGames, UserHandle.current)
+ "/Frontier Developments/Elite Dangerous/Status.json")
self.last_mod_time = None
# Read json file data
self.last_data = None
self.current_data = None
self.current_data = self.get_cleaned_data()
# self.watch_thread = threading.Thread(target=self._watch_file_thread, daemon=True)
# self.watch_thread.start()
# self.status_queue = queue.Queue()
#
# def _watch_file_thread(self):
# backoff = 1
# while True:
# try:
# self._watch_file()
# except Exception as e:
# logger.debug('An error occurred when reading status file')
# sleep(backoff)
# logger.debug('Attempting to restart status file reader after failure')
# backoff *= 2
#
# def _watch_file(self):
# """Detects changes in the Status.json file."""
# while True:
# status = self.get_cleaned_data()
# if status != self.current_data:
# self.status_queue.put(status)
# self.current_data = status
# sleep(1)
def get_file_modified_time(self) -> float:
return os.path.getmtime(self.file_path)
def translate_flags(self, flags_value):
"""Translates flags integer to a dictionary of only True flags."""
all_flags = {
"Docked": bool(flags_value & 1),
"Landed": bool(flags_value & 2),
"Landing Gear Down": bool(flags_value & 4),
"Shields Up": bool(flags_value & 8),
"Supercruise": bool(flags_value & 16),
"FlightAssist Off": bool(flags_value & 32),
"Hardpoints Deployed": bool(flags_value & 64),
"In Wing": bool(flags_value & 128),
"Lights On": bool(flags_value & 256),
"Cargo Scoop Deployed": bool(flags_value & 512),
"Silent Running": bool(flags_value & 1024),
"Scooping Fuel": bool(flags_value & 2048),
"Srv Handbrake": bool(flags_value & 4096),
"Srv using Turret view": bool(flags_value & 8192),
"Srv Turret retracted (close to ship)": bool(flags_value & 16384),
"Srv DriveAssist": bool(flags_value & 32768),
"Fsd MassLocked": bool(flags_value & 65536),
"Fsd Charging": bool(flags_value & 131072),
"Fsd Cooldown": bool(flags_value & 262144),
"Low Fuel (< 25%)": bool(flags_value & 524288),
"Over Heating (> 100%)": bool(flags_value & 1048576),
"Has Lat Long": bool(flags_value & 2097152),
"IsInDanger": bool(flags_value & 4194304),
"Being Interdicted": bool(flags_value & 8388608),
"In MainShip": bool(flags_value & 16777216),
"In Fighter": bool(flags_value & 33554432),
"In SRV": bool(flags_value & 67108864),
"Hud in Analysis mode": bool(flags_value & 134217728),
"Night Vision": bool(flags_value & 268435456),
"Altitude from Average radius": bool(flags_value & 536870912),
"Fsd Jump": bool(flags_value & 1073741824),
"Srv HighBeam": bool(flags_value & 2147483648),
}
# Return only flags that are True
true_flags = {key: value for key, value in all_flags.items() if value}
return true_flags
def translate_flags2(self, flags2_value):
"""Translates Flags2 integer to a dictionary of only True flags."""
all_flags2 = {
"OnFoot": bool(flags2_value & 1),
"InTaxi": bool(flags2_value & 2),
"InMulticrew": bool(flags2_value & 4),
"OnFootInStation": bool(flags2_value & 8),
"OnFootOnPlanet": bool(flags2_value & 16),
"AimDownSight": bool(flags2_value & 32),
"LowOxygen": bool(flags2_value & 64),
"LowHealth": bool(flags2_value & 128),
"Cold": bool(flags2_value & 256),
"Hot": bool(flags2_value & 512),
"VeryCold": bool(flags2_value & 1024),
"VeryHot": bool(flags2_value & 2048),
"Glide Mode": bool(flags2_value & 4096),
"OnFootInHangar": bool(flags2_value & 8192),
"OnFootSocialSpace": bool(flags2_value & 16384),
"OnFootExterior": bool(flags2_value & 32768),
"BreathableAtmosphere": bool(flags2_value & 65536),
"Telepresence Multicrew": bool(flags2_value & 131072),
"Physical Multicrew": bool(flags2_value & 262144),
"Fsd hyperdrive charging": bool(flags2_value & 524288),
"Flags2Future20": bool(flags2_value & 1048576),
"Flags2Future21": bool(flags2_value & 2097152),
"Flags2Future22": bool(flags2_value & 4194304),
"Flags2Future23": bool(flags2_value & 8388608),
"Flags2Future24": bool(flags2_value & 16777216),
"Flags2Future25": bool(flags2_value & 33554432),
"Flags2Future26": bool(flags2_value & 67108864),
"Flags2Future27": bool(flags2_value & 134217728),
"Flags2Future28": bool(flags2_value & 268435456),
"Flags2Future29": bool(flags2_value & 536870912),
"Flags2Future30": bool(flags2_value & 1073741824),
"Flags2Future31": bool(flags2_value & 2147483648),
}
# Return only flags that are True
true_flags2 = {key: value for key, value in all_flags2.items() if value}
return true_flags2
def transform_pips(self, pips_list):
"""Transforms the pips list to a dictionary and halves each value."""
return {
'system': pips_list[0] / 2,
'engine': pips_list[1] / 2,
'weapons': pips_list[2] / 2
}
def adjust_year(self, timestamp):
"""Increases the year in the timestamp by 1286 years."""
# Parse the timestamp string into a datetime object
dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
# Increase the year by 1286
dt = dt.replace(year=dt.year + 1286)
# Format the datetime object back into a string
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def get_cleaned_data(self):
"""Loads data from the JSON file and returns cleaned data with only the necessary fields.
{
"timestamp":"2024-09-28T16:01:47Z",
"event":"Status",
"Flags":150994968,
"Flags2":0,
"Pips":[4,4,4],
"FireGroup":0,
"GuiFocus":0,
"Fuel":
{
"FuelMain":36.160004,
"FuelReservoir":0.534688
},
"Cargo":728.000000,
"LegalState":"Clean",
"Balance":3119756215,
"Destination":
{
"System":7267487524297,
"Body":12,
"Name":"P.T.N. PERSEVERANCE TFK-N3G"
}
}
"""
# Check if file changed
if self.get_file_modified_time() == self.last_mod_time:
#logger.debug(f'Status.json mod timestamp {self.last_mod_time} unchanged.')
#print(f'Status.json mod timestamp {self.last_mod_time} unchanged.')
return self.current_data
# Read file
backoff = 1
while True:
try:
with open(self.file_path, 'r') as file:
data = json.load(file)
break
except Exception as e:
logger.debug('An error occurred when reading Status.json file')
sleep(backoff)
logger.debug('Attempting to restart status file reader after failure')
backoff *= 2
# Combine flags from Flags and Flags2 into a single dictionary
# combined_flags = {**self.translate_flags(data['Flags'])}
# if 'Flags2' in data:
# combined_flags = {**combined_flags, **self.translate_flags2(data['Flags2'])}
# Initialize cleaned_data with common fields
cleaned_data = {
#'status': combined_flags,
'time': (datetime.now() + timedelta(days=469711)).isoformat(),
'Flags': data['Flags']
}
# Add optional status flags
if 'Flags2' in data:
cleaned_data['Flags2'] = data['Flags2']
if 'Pips' in data:
cleaned_data['pips'] = self.transform_pips(data['Pips'])
if 'GuiFocus' in data:
cleaned_data['GuiFocus'] = data['GuiFocus']
if 'Cargo' in data:
cleaned_data['cargo'] = data['Cargo']
if 'LegalState' in data:
cleaned_data['legalState'] = data['LegalState']
if 'Balance' in data:
cleaned_data['balance'] = data['Balance']
if 'Destination' in data:
cleaned_data['Destination'] = data['Destination']
else:
cleaned_data['Destination'] = None
# Store data
self.last_data = self.current_data
self.current_data = cleaned_data
self.last_mod_time = self.get_file_modified_time()
#logger.debug(f'Status.json mod timestamp {self.last_mod_time} updated.')
# print(f'Status.json mod timestamp {self.last_mod_time} updated.')
# print(json.dumps(data, indent=4))
# Enable the following to print all the changes to the status flags.
# self.log_flag_diffs()
return cleaned_data
def log_flag_diffs(self):
if self.last_data is None:
return
if self.current_data is None:
return
old_flags = self.last_data['Flags']
new_flags = self.current_data['Flags']
flags_on = new_flags & ~ old_flags
flag_array = self.translate_flags(flags_on)
for item in flag_array:
print(f"Status Flags: '{item}' is ON")
flags_off = ~ new_flags & old_flags
flag_array = self.translate_flags(flags_off)
for item in flag_array:
print(f"Status Flags: '{item}' is OFF")
if self.last_data['Flags2'] is None:
return
if self.current_data['Flags2'] is None:
return
old_flags2 = self.last_data['Flags2']
new_flags2 = self.current_data['Flags2']
flags_on2 = new_flags2 & ~ old_flags2
flag_array2 = self.translate_flags2(flags_on2)
for item in flag_array2:
print(f"Status Flags2: '{item}' is ON")
flags_off2 = ~ new_flags2 & old_flags2
flag_array2 = self.translate_flags2(flags_off2)
for item in flag_array2:
print(f"Status Flags2: '{item}' is OFF")
def get_gui_focus(self) -> int:
""" Gets the value of the GUI Focus flag.
The Flag constants are defined in 'EDAP_data.py'.
"""
self.get_cleaned_data()
return self.current_data.get('GuiFocus', 0)
def wait_for_flag_on(self, flag: int, timeout: float = 15) -> bool:
""" Waits for the of the selected flag to turn true.
Returns True if the flag turns true or False on a time-out.
The Flag constants are defined in 'EDAP_data.py'.
@param timeout: Timeout in seconds.
@param flag: The flag to check for.
"""
start_time = time.time()
while (time.time() - start_time) < timeout:
self.get_cleaned_data()
if bool(self.current_data['Flags'] & flag):
return True
sleep(0.5)
return False
def wait_for_flag_off(self, flag: int, timeout: float = 15) -> bool:
""" Waits for the of the selected flag to turn false.
Returns True if the flag turns false or False on a time-out.
The Flag constants are defined in 'EDAP_data.py'.
@param timeout: Timeout in seconds.
@param flag: The flag to check for.
"""
start_time = time.time()
while (time.time() - start_time) < timeout:
self.get_cleaned_data()
if not bool(self.current_data['Flags'] & flag):
return True
sleep(0.5)
return False
def wait_for_flag2_on(self, flag: int, timeout: float = 15) -> bool:
""" Waits for the of the selected flag to turn true.
Returns True if the flag turns true or False on a time-out.
The Flag constants are defined in 'EDAP_data.py'.
@param timeout: Timeout in seconds.
@param flag: The flag to check for.
"""
if 'Flags2' not in data:
return False
start_time = time.time()
while (time.time() - start_time) < timeout:
self.get_cleaned_data()
if bool(self.current_data['Flags2'] & flag):
return True
sleep(0.5)
return False
def wait_for_flag2_off(self, flag: int, timeout: float = 15) -> bool:
""" Waits for the of the selected flag to turn false.
Returns True if the flag turns false or False on a time-out.
The Flag constants are defined in 'EDAP_data.py'.
@param timeout: Timeout in seconds.
@param flag: The flag to check for.
"""
if 'Flags2' not in data:
return False
start_time = time.time()
while (time.time() - start_time) < timeout:
self.get_cleaned_data()
if not bool(self.current_data['Flags2'] & flag):
return True
sleep(0.5)
return False
def get_flag(self, flag: int) -> bool:
""" Gets the value of the selected flag.
The Flag constants are defined in 'EDAP_data.py'.
@param flag: The flag to check for.
"""
self.get_cleaned_data()
return bool(self.current_data['Flags'] & flag)
def get_flag2(self, flag: int) -> bool:
""" Gets the value of the selected flag2.
The Flag2 constants are defined in 'EDAP_data.py'.
@param flag: The flag to check for.
"""
self.get_cleaned_data()
if 'Flags2' in self.current_data:
return bool(self.current_data['Flags2'] & flag)
else:
return False
# Usage Example
if __name__ == "__main__":
parser = StatusParser()
while True:
start_time = time.time()
data = parser.get_cleaned_data()
sleep(1)
#docked = parser.get_flag(FlagsDocked, False)
#landed = parser.get_flag(FlagsLanded, False)
inmainship = parser.get_flag(FlagsInMainShip)
print(f"Time: {(time.time() - start_time)}")
#sc = parser.get_flag(FlagsSupercruise, False)
#print(json.dumps(data, indent=4))
#print(f"Supercruise: {sc}")
time.sleep(1)
#print("\n" * 10)