-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFlightPrediction.py
347 lines (295 loc) · 13 KB
/
FlightPrediction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#Prediction
import sys, os, time, math
import argparse
import json
import urllib
import urllib2
import string
#import xml.etree.ElementTree as ET
class PayloadStatus:
"""docstring for PayloadStatus"""
def __init__(self, location, time, speed = 0, ascentRate = 0, actualWeight = 1):
self.location = location
self.time = time
self.speed = speed
self.ascentRate = ascentRate
self.actualWeight = actualWeight;
self.apparentWeight = self.actualWeight;
def isFalling(self):
if (self.ascentRate < -4): ###Falling constant?
return True
return False
class Location:
"""docstring for Location"""
def __init__(self, lat, lng, alt):
#super(Location, self).__init__()
self.latitude = lat
self.longitude = lng
self.altitude = alt
def addDelta(self,lat,lng,alt):
self.latitude += lat
self.longitude += lng
self.altitude += alt
#wind vectors by altitude
class Wind:
"""docstring for Wind"""
def __init__(self, velocity, bearing, altitude):
self.velocity = velocity
self.bearing = bearing
self.altitude = altitude
def getLatVelocity(self):
return math.sin(math.radians(self.bearing)) * self.velocity
def getLngVelocity(self):
return math.cos(math.radians(self.bearing)) * self.velocity
class WindString:
"""docstring for WindString"""
winds = []
"""docstring for Wind"""
def __init__(self):
pass
def appendWind(self, newWind):
#print newWind.velocity
if newWind.velocity > .007: # .0019:
#print newWind.velocity
newWind.velocity = 0
if len(self.winds) == 0:
self.winds.append(newWind)
else:
i = 0
while self.winds[i].altitude > newWind.altitude and i < len(self.winds):
i += 1
self.winds.insert(i,newWind)
def getWindAtAlt(self, targetAltitude, t=0):
if len(self.winds) > 1:
for i in range(len(self.winds)):
if self.winds[i].altitude < targetAltitude:
return interpolateWind(self.winds[i-1], self.winds[i], targetAltitude)
return self.winds[-1]
def interpolateWind(lowWind, highWind, targetAltitude):
#this is where improvements will be made. This currently only lineraly interpolates between the two vectors
ratioOfHighToLow = (targetAltitude - lowWind.altitude) / (highWind.altitude - lowWind.altitude)
latDelta = (lowWind.getLatVelocity() * (1 - ratioOfHighToLow) ) + (highWind.getLatVelocity() * ratioOfHighToLow)
lngDelta = (lowWind.getLngVelocity() * (1 - ratioOfHighToLow) ) + (highWind.getLngVelocity() * ratioOfHighToLow)
iVelocity = math.sqrt(latDelta*latDelta + lngDelta*lngDelta) # not what we are doing: highWind.velocity * ratioOfHighToLow + lowWind.velocity * (1 - ratioOfHighToLow)
iBearing = computeBearing(latDelta,lngDelta)
return Wind(iVelocity, iBearing, targetAltitude)
#return Wind(lowWind.velocity, lowWind.bearing, targetAltitude) # no interpolation
class PayloadPath:
"""docstring for PayloadPath"""
global adaptiveWeightCorrection
global payloadWeight
def __init__(self):
self.path = []
def predictDecsentPath(self, windField, currentStatus):
#Predict Decsent path (lat/lng, alt at specific times)
self.path.append(currentStatus)
def my_range(start, end, step):
while start > end:
yield start
start -= step
#descent prediction only
altDelta = 1.0 # we are really predicting 1m at a time - MEGA RESOLUTION!!!
groundLevel = 1595 #m # NEED FUNCTION to get ground alt at guessed landing point
for x in my_range(currentStatus.location.altitude, groundLevel, altDelta): #lets try this range thing
timeDelta = altDelta*self.descentTime(self.path[-1])
latDelta = windField.getWindAtAlt(x).getLatVelocity() * (timeDelta) #knots / sec
lngDelta = windField.getWindAtAlt(x).getLngVelocity() * (timeDelta) #knots / sec
#print timeDelta
#speed = getWindAtAlt(x).velocity #knots - not taking verticle speed into account - useless but kinda cool I guess
#append new +=lat/lng, alt, time, to path array
nextLocation = Location(self.path[-1].location.latitude, self.path[-1].location.longitude, self.path[-1].location.altitude)
nextLocation.addDelta(latDelta,lngDelta,((-1)*altDelta))
nextTime = self.path[-1].time + timeDelta
#print nextLocation.latitude, nextLocation.longitude, nextLocation.altitude, nextTime
self.path.append(PayloadStatus(nextLocation,nextTime,0,0,self.path[-1].apparentWeight))
def descentTime(self, payloadStatus):
#Predict decent rate
#Adapt decent rate time traveling through each altitude zone
x = payloadStatus.location.altitude
return (1 / descentVelocity(x, payloadStatus.apparentWeight))
def apparentWeightCorrection(point1, point2):
additionalWeight = 0
if (point1.isFalling()):
averageDescentRate = (((point2.ascentRate - point1.ascentRate) / 2.0) + point1.ascentRate) * -1.0
averageAltitude = (((point2.location.altitude - point1.location.altitude) / 2.0) + point1.location.altitude)
additionalWeight = (pow(averageDescentRate,2) * estimatedAirDensityAtAlt(averageAltitude)) / combinedDragCoefficient(averageAltitude) - point1.actualWeight
if additionalWeight > 20:
#print "WOAH THERE - You are falling way faster than expected! I'll back the estimated added weight to 20lbs down from" + str(additionalWeight)
additionalWeight = 20
if additionalWeight < -20:
#print "WOAH THERE - You are falling way slower than expected! I'll back the estimated added weight to -20lbs down from" + str(additionalWeight)
additionalWeight = -20
return additionalWeight
def combinedDragCoefficient(altitude):
if (altitude > 17000):
return 12.0 #chute not open drag ~=.? not much
elif (altitude > 8000): #26.2ft
return 4.0 #chute not open drag ~=.5
else:
return 2.996 #chute open drag =.75 at 4m^2 chute
def estimatedAirTempAtAlt(meters):
#MAGIC based off NASA's standard day
if (meters<11000):
celcius = 15.04 - 0.00649 * meters
elif (meters<25000):
celcius = -56.46
else:
celcius = -131.21 + 0.00299 * meters
return celcius
def estimatedAirPressureAtAlt(meters, celcius=None):
#MAGIC based off NASA's standard day
#init to sea level for reference
if celcius == None:
celcius = estimatedAirTempAtAlt(meters)
if (meters<11000):
kPa = 101.29 * math.pow(((celcius + 273.1)/288.08), 5.256)
elif (meters<25000):
kPa = 22.65 * math.exp(1.73 - 0.000157 * meters)
else:
kPa = 2.488 * math.pow(((celcius + 273.1)/ 216.6), -11.388)
return kPa
def estimatedAirDensityAtAlt(meters, celcius=None):
#MAGIC based off NASA's standard day
#init to sea level for reference
if celcius == None:
celcius = estimatedAirTempAtAlt(meters)
kPa = estimatedAirPressureAtAlt(meters, celcius)
kgPm3 = airDensity(kPa,celcius)
#TODO Sanity check here and dynamic programming
return kgPm3
def airDensity(kPa, celcius):
return kPa / (0.2869 * (celcius + 273.1))
def testAir():
i = 0
while i < 40000:
i += 1000
print str(i) +" m = "+ str(estimatedAirDensityAtAlt(i))
def descentVelocity(altitude,apparentWeight):
return math.sqrt((combinedDragCoefficient(altitude) * apparentWeight) / estimatedAirDensityAtAlt(altitude))
def computeBearing(lat,lng):
bearing = 0
if lng == 0:
if lat > 0:
bearing = 90
elif lng < 0:
bearing = 270
else:
if lat == 0:
if lng < 0:
bearing == 180
else:
bearing = math.degrees(math.atan(lat/lng))
if bearing < 0:
bearing += 360
return bearing
def measuredWindFromFlightPoints(point1, point2):
timeStep = point2.time - point1.time
#print timeStep
latDelta = point2.location.latitude - point1.location.latitude
lngDelta = point2.location.longitude - point1.location.longitude
avgAltitude = (point2.location.altitude - point1.location.altitude)/2 + point1.location.altitude #average altitude beteween data addMeasuredWindFromFlightPoint
bearing = computeBearing(latDelta,lngDelta)
velocity = math.sqrt(latDelta * latDelta + lngDelta * lngDelta) / timeStep #close enough to knotts
#print velocity, bearing, avgAltitude, timeStep
return Wind(velocity, bearing, avgAltitude)
#this belongs somewhere else
def findAscentRate(alt,time,oldPoint):
timeStep = time - oldPoint.time
ascentRate = (alt - oldPoint.location.altitude) / timeStep #in +/- m/s
#print ascentRate
return ascentRate
#this belongs somewhere else
#future self: please make this average the recent ascent rates within a reasonable timespan to get a more valid ascent rate
def findRecentAverageAscentRate(point1, point2):
return (point1.ascentRate + point2.ascentRate)/2.0
def dd2dms(decDegrees):
degrees = int(decDegrees)
decMinutes = (decDegrees - degrees) * 60.0
minutes = int(decMinutes)
decSeconds = (decMinutes - minutes) * 60.0
seconds = int(decSeconds)
tenths = (decSeconds - seconds) * 10
return ({'Degrees':degrees, 'Minutes':minutes, 'Seconds':int(decSeconds*10000)/10000.0, 'Tenths':tenths})
def formatDMS(dms):
return (str(dms['Degrees']) + "* " + str(dms['Minutes']) + "' " + str(dms['Seconds']) + '"')
def arguments():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='Run a landing predicton for a balloon flight.')
parser.add_argument(
'--csv',
action='store',
help='CSV file')
parser.add_argument(
'--mongo',
action='store',
help='EDGE ID to query mongo for')
parser.add_argument(
'--nmea',
action='store',
help='NMEA file')
args = parser.parse_args()
return args
def main():
args = arguments()
if args.csv:
from import_csv import get_csv_points
flightPoints = get_csv_points(args.csv)
elif args.nmea:
from import_nmea import get_points
flightPoints = get_nmea_points(args.nmea)
elif args.mongo:
from import_mongo import get_mongo_points
flightPoints = get_mongo_points(args.mongo)
else:
flightPoints = sys.argv[1]
runPrediction(flightPoints)
def runPrediction(flightPoints):
''' function does
:param flightPoints: array of flight data points like [{lat:#DD#, lng:#DD#, alt:#m#, time:#s#}]
'''
payloadWeight = 11
initialLocation = Location(5,5,5)
windField = WindString()
#populate windField
windField.appendWind(Wind(0,0,0))
flight = []
try:
for i in range(0,len(flightPoints),1):
dp = flightPoints[i]
time = dp['time']
speed = 0#dp.speed
lat = dp['latitude']
lng = dp['longitude']
alt = dp['altitude']
if len(flight) == 0: #initialize that first one
flight.append(PayloadStatus(Location(lat,lng,alt), time, speed, 0, payloadWeight))
if time != flight[-1].time:
#print lat,lng,alt,speed,time
flight.append(PayloadStatus(Location(lat,lng,alt), time, speed, findAscentRate(alt,time,flight[-1]), payloadWeight))
if len(flight) > 3:
windField.appendWind(measuredWindFromFlightPoints(flight[-2],flight[-1]))
#print flight[-1].time - flight[-2].time
#print len(windField.winds)
#else: #What was I thinking?
# flight.append(PayloadStatus(Location(lat,lng,alt), time, speed, flight[-1].ascentRate))
#for w in windField.winds:
# print w.velocity * 100000, " knots at ", w.bearing, " degrees at", w.altitude, "meters"
if len(flight) > 2:
adaptiveWeightCorrection = apparentWeightCorrection(flight[-2],flight[-1])
#print adaptiveWeightCorrection
flight[-1].apparentWeight = flight[-1].actualWeight + adaptiveWeightCorrection
descentPrediction = PayloadPath()
descentPrediction.predictDecsentPath(windField, flight[-1])
landingPoint = {
#'edgeId':edgeId,
'longitude':descentPrediction.path[-1].location.longitude,
'latitude':descentPrediction.path[-1].location.latitude
}
#sendPrediction(landingPoint);
print json.dumps(landingPoint)
#print {'longitude':formatDMS(dd2dms(landingPoint['longitude'])), 'latitude':formatDMS(dd2dms(landingPoint['latitude']))}
except IOError as e: # Usually just means that the xml isn't online yet.
return
if __name__ == '__main__':
main()