forked from ukyg9e5r6k7gubiekd6/gpsd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgpssim.py
323 lines (284 loc) · 11.2 KB
/
gpssim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# This file is Copyright (c) 2010 by the GPSD project
# SPDX-License-Identifier: BSD-2-clause
"""
A GPS simulator.
This is proof-of-concept code, not production ready; some functions are stubs.
"""
import math
import random
import sys
import time
# pylint wants local modules last
try:
import gps
import gpslib
except ImportError as e:
sys.stderr.write(
"gpssim.py: can't load Python gps libraries -- check PYTHONPATH.\n")
sys.stderr.write("%s\n" % e)
sys.exit(1)
# First, the mathematics. We simulate a moving viewpoint on the Earth
# and a satellite with specified orbital elements in the sky.
class ksv(object):
"Kinematic state vector."
def __init__(self, time=0, lat=0, lon=0, alt=0, course=0,
speed=0, climb=0, h_acc=0, v_acc=0):
self.time = time # Seconds from epoch
self.lat = lat # Decimal degrees
self.lon = lon # Decimal degrees
self.alt = alt # Meters
self.course = course # Degrees from true North
self.speed = speed # Meters per second
self.climb = climb # Meters per second
self.h_acc = h_acc # Meters per second per second
self.v_acc = v_acc # Meters per second per second
def next(self, quantum=1):
"State after quantum."
self.time += quantum
avspeed = (2 * self.speed + self.h_acc * quantum) / 2
avclimb = (2 * self.climb + self.v_acc * quantum) / 2
self.alt += avclimb * quantum
self.speed += self.h_acc * quantum
self.climb += self.v_acc * quantum
distance = avspeed * quantum
# Formula from <http://williams.best.vwh.net/avform.htm#Rhumb>
# Initial point cannot be a pole, but GPS doesn't work at high.
# latitudes anyway so it would be OK to fail there.
# Seems to assume a spherical Earth, which means it's going
# to have a slight inaccuracy rising towards the poles.
# The if/then avoids 0/0 indeterminacies on E-W courses.
tc = gps.Deg2Rad(self.course)
lat = gps.Deg2Rad(self.lat)
lon = gps.Deg2Rad(self.lon)
lat += distance * math.cos(tc)
dphi = math.log(math.tan(lat / 2 + math.pi / 4) /
math.tan(self.lat / 2 + math.pi / 4))
if abs(lat - self.lat) < math.sqrt(1e-15):
q = math.cos(self.lat)
else:
q = (lat - self.lat) / dphi
dlon = -distance * math.sin(tc) / q
self.lon = gps.Rad2Deg(math.mod(lon + dlon + math.pi, 2 * math.pi) -
math.pi)
self.lat = gps.Rad2Deg(lat)
# Satellite orbital elements are available at:
# <http://www.ngs.noaa.gov/orbits/>
# Orbital theory at:
# <http://www.wolffdata.se/gps/gpshtml/anomalies.html>
class satellite(object):
"Orbital elements of one satellite. PRESENTLY A STUB"
def __init__(self, prn):
self.prn = prn
def position(self, time):
"Return right ascension and declination of satellite,"
return
# Next, the command interpreter. This is an object that takes an
# input source in the track description language, interprets it into
# sammples that might be reported by a GPS, and calls a reporting
# class to generate output.
class gpssimException(BaseException):
def __init__(self, message, filename, lineno):
BaseException.__init__(self)
self.message = message
self.filename = filename
self.lineno = lineno
def __str__(self):
return '"%s", %d:' % (self.filename, self.lineno)
class gpssim(object):
"Simulate a moving sensor, with skyview."
active_PRNs = list(range(1, 24 + 1)) + [134, ]
def __init__(self, outfmt):
self.ksv = ksv()
self.ephemeris = {}
# This sets up satellites at random. Not really what we want.
for prn in gpssim.active_PRNs:
for (prn, _satellite) in list(self.ephemeris.items()):
self.skyview[prn] = (random.randint(-60, +61),
random.randint(0, 359))
self.have_ephemeris = False
self.channels = {}
self.outfmt = outfmt
self.status = gps.STATUS_NO_FIX
self.mode = gps.MODE_NO_FIX
self.validity = "V"
self.satellites_used = 0
self.filename = None
self.lineno = 0
def parse_tdl(self, line):
"Interpret one TDL directive."
line = line.strip()
if "#" in line:
line = line[:line.find("#")]
if line == '':
return
fields = line.split()
command = fields[0]
if command == "time":
self.ksv.time = gps.isotime(fields[1])
elif command == "location":
(self.lat, self.lon, self.alt) = list(map(float, fields[1:]))
elif command == "course":
self.ksv.time = float(fields[1])
elif command == "speed":
self.ksv.speed = float(fields[1])
elif command == "climb":
self.ksv.climb = float(fields[1])
elif command == "acceleration":
(self.ksv.h_acc, self.ksv.h_acc) = list(map(float, fields[1:]))
elif command == "snr":
self.channels[int(fields[1])] = float(fields[2])
elif command == "go":
self.go(int(fields[1]))
elif command == "status":
try:
code = fields[1]
self.status = {"no_fix": 0, "fix": 1, "dgps_fix": 2}[
code.lower()]
except KeyError:
raise gpssimException("invalid status code '%s'" % code,
self.filename, self.lineno)
elif command == "mode":
try:
code = fields[1]
self.status = {"no_fix": 1, "2d": 2, "3d": 3}[code.lower()]
except KeyError:
raise gpssimException("invalid mode code '%s'" % code,
self.filename, self.lineno)
elif command == "satellites":
self.satellites_used = int(fields[1])
elif command == "validity":
self.validity = fields[1]
else:
raise gpssimException("unknown command '%s'" % fields[1],
self.filename, self.lineno)
# FIX-ME: add syntax for ephemeris elements
self.lineno += 1
def filter(self, inp, outp):
"Make this a filter for file-like objects."
self.filename = input.name
self.lineno = 1
self.output = outp
for line in inp:
self.execute(line)
def go(self, seconds):
"Run the simulation for a specified number of seconds."
for i in range(seconds):
next(self.ksv)
if self.have_ephemeris:
self.skyview = {}
for (prn, satellite) in list(self.ephemeris.items()):
self.skyview[prn] = satellite.position(i)
self.output.write(self.gpstype.report(self))
# Reporting classes need to have a report() method returning a string
# that is a sentence (or possibly several sentences) reporting the
# state of the simulation. Presently we have only one, for NMEA
# devices, but the point of the architecture is so that we could simulate
# others - SirF, Evermore, whatever.
MPS_TO_KNOTS = 1.9438445 # Meters per second to knots
class NMEA(object):
"NMEA output generator."
def __init__(self):
self.sentences = ("RMC", "GGA",)
self.counter = 0
def add_checksum(self, mstr):
"Concatenate NMEA checksum and trailer to a string"
csum = 0
for (i, c) in enumerate(mstr):
if i == 0 and c == "$":
continue
csum ^= ord(c)
mstr += "*%02X\r\n" % csum
return mstr
def degtodm(self, angle):
"Decimal degrees to GPS-style, degrees first followed by minutes."
(fraction, _integer) = math.modf(angle)
return math.floor(angle) * 100 + fraction * 60
def GGA(self, sim):
"Emit GGA sentence describing the simulation state."
tm = time.gmtime(sim.ksv.time)
gga = "$GPGGA,%02d%02d%02d,%09.4f,%c,%010.4f,%c,%d,%02d," % (
tm.tm_hour,
tm.tm_min,
tm.tm_sec,
self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
sim.status,
sim.satellites_used)
# HDOP calculation goes here
gga += ","
if sim.mode == gps.MODE_3D:
gga += "%.1f,M" % self.ksv.lat
gga += ","
gga += "%.3f,M," % gpslib.wg84_separation(sim.ksv.lat, sim.ksv.lon)
# Magnetic variation goes here
# gga += "%3.2f,M," % mag_var
gga += ",,"
# Time in seconds since last DGPS update goes here
gga += ","
# DGPS station ID goes here
return self.add_checksum(gga)
def GLL(self, sim):
"Emit GLL sentence describing the simulation state."
tm = time.gmtime(sim.ksv.time)
gll = "$GPLL,%09.4f,%c,%010.4f,%c,%02d%02d%02d,%s," % (
self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
tm.tm_hour,
tm.tm_min,
tm.tm_sec,
sim.validity, )
# FAA mode indicator could go after these fields.
return self.add_checksum(gll)
def RMC(self, sim):
"Emit RMC sentence describing the simulation state."
tm = time.gmtime(sim.ksv.time)
rmc = \
"GPRMC,%02d%02d%02d,%s,%09.4f,%c,%010.4f,%c,%.1f,%02d%02d%02d," % (
tm.tm_hour,
tm.tm_min,
tm.tm_sec,
sim.validity,
self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
sim.course * MPS_TO_KNOTS,
tm.tm_mday,
tm.tm_mon,
tm.tm_year % 100)
# Magnetic variation goes here
# rmc += "%3.2f,M," % mag_var
rmc += ",,"
# FAA mode goes here
return self.add_checksum(rmc)
def ZDA(self, sim):
"Emit ZDA sentence describing the simulation state."
tm = time.gmtime(sim.ksv.time)
zda = "$GPZDA,%02d%2d%02d,%02d,%02d,%04d" % (
tm.tm_hour,
tm.tm_min,
tm.tm_sec,
tm.tm_mday,
tm.tm_mon,
tm.tm_year, )
# Local zone description, 00 to +- 13 hours, goes here
zda += ","
# Local zone minutes description goes here
zda += ","
return self.add_checksum(zda)
def report(self, sim):
"Report the simulation state."
out = ""
for sentence in self.sentences:
if isinstance(sentence, tuple):
(interval, sentence) = sentence
if self.counter % interval:
continue
out += getattr(self, sentence)(*[sim])
self.counter += 1
return out
# The very simple main line.
if __name__ == "__main__":
try:
gpssim(NMEA).filter(sys.stdin, sys.stdout)
except gpssimException as e:
sys.stderr.write(repr(e) + "\n")
# gpssim.py ends here.