-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathtx_uhd.py
executable file
·241 lines (195 loc) · 7.37 KB
/
tx_uhd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python3
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
Generate and TX samples using a set of waveforms, and waveform characteristics
"""
import argparse
import uhd
import time
import threading
import numpy as n
import matplotlib.pyplot as plt
import os
import signal
# internal modules related with the ionosonde
import sweep
import gps_lock as gl
import iono_logger
import iono_config
from datetime import datetime, timedelta
Exit = False # Used to signal an orderly exit
def orderlyExit(signalNumber, frame):
global Exit
# Signal that we want to exit after current sweep
print("Recieved SIGUSR1", flush=True)
Exit = True
def tune_at(u, t0, ic, f0=4e6, gpio_state=0):
"""
tune radio to frequency f0 at t0_full
use a timed command.
Toggle watchdog pin 1/16 on TX DB
Control antenna selector pin 2/16 on TX DB based on configuration
"""
u.clear_command_time()
t0_ts=uhd.libpyuhd.types.time_spec(t0)
print("tuning to %1.2f MHz at %1.2f" % (f0/1e6, t0_ts.get_real_secs()))
u.set_command_time(t0_ts)
tune_req=uhd.libpyuhd.types.tune_request(f0)
u.set_tx_freq(tune_req)
u.set_rx_freq(tune_req)
# toggle pin 1/16 for watchdog
watchdog = 0x00 if gpio_state == 0 else 0x01
# toggle pin 2/16 for antenna select
antenna_select = 0x02 if f0/1e6 > ic.antenna_select_freq else 0x00
print("Watchdog TX A GPIO={:02b}".format(watchdog | antenna_select))
gpio_line=0xff
u.set_gpio_attr("TXA", "OUT", watchdog | antenna_select, gpio_line, 0)
u.clear_command_time()
def tx_send(tx_stream, waveform, md, ic, timeout=11.0):
# this command will block until everything is in the transmit
# buffer.
tx_stream.send(waveform, md, timeout=(len(waveform)/float(ic.sample_rate))+1.0)
def rx_swr(u, t0, recv_buffer, f0, log, ic):
"""
Receive samples for a reflected power measurement
USRP output connected to input with 35 dB attenuation gives
9.96 dB reflected power.
"""
N=len(recv_buffer)
stream_args=uhd.usrp.StreamArgs("fc32", "sc16")
rx_stream=u.get_rx_stream(stream_args)
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
stream_cmd.num_samps=N
stream_cmd.stream_now=False
stream_cmd.time_spec=uhd.types.TimeSpec(t0)
rx_stream.issue_stream_cmd(stream_cmd)
md=uhd.types.RXMetadata()
num_rx_samps=rx_stream.recv(recv_buffer, md, timeout=float(N/ic.sample_rate)+1.0)
pwr=n.mean(n.abs(recv_buffer)**2.0)
rx_stream=None
if pwr <= 0.0:
pwr=1e-99
refl_pwr_dBm=10.0*n.log10(pwr)+ic.reflected_power_cal_dB
log.log("reflected pwr %1.4f (MHz) %1.4f (dBm)" % (f0/1e6, refl_pwr_dBm))
def transmit_waveform(u, t0_full, waveform, swr_buffer, f0, log, ic):
"""
Transmit a timed burst
"""
t0_ts=uhd.libpyuhd.types.time_spec(n.uint64(t0_full), 0.0)
stream_args=uhd.usrp.StreamArgs("fc32", "sc16")
md=uhd.types.TXMetadata()
md.has_time_spec=True
md.time_spec=t0_ts
t0_full_dt = datetime.fromtimestamp(t0_full)
print("transmit start at %1.2f (%s)" % (t0_full, t0_full_dt.strftime("%FT%T.%f")[:-3]))
# wait for moment right before transmit
t_now=u.get_time_now().get_real_secs()
t_now_dt = datetime.fromtimestamp(t_now)
print("setup time %1.2f (%s)" % (t_now, t_now_dt.strftime("%FT%T.%f")[:-3]))
if t_now > t0_full:
log.log(
"Delayed start for transmit %1.2f (%s) %1.2f (%s)"
% (
t_now,
t_now_dt.strftime("%FT%T.%f")[:-3],
t0_full,
t0_full_dt.strftime("%FT%T.%f")[:-3]
)
)
if t0_full-t_now > 0.1:
time.sleep(t0_full-t_now-0.1)
try:
# transmit the code
tx_stream=u.get_tx_stream(stream_args)
tx_thread = threading.Thread(target=tx_send, args=(tx_stream, waveform, md, ic))
tx_thread.daemon=True # exit if parent thread exits
tx_thread.start()
# do an swr measurement
rx_thread = threading.Thread(target=rx_swr, args=(u, t0_full, swr_buffer, f0, log, ic))
rx_thread.daemon=True # exit if parent thread exits
rx_thread.start()
tx_thread.join()
rx_thread.join()
tx_stream=None
except Exception as e:
exit(0)
def main(config):
"""
The main loop for the ionosonde transmitter
"""
global Exit
# setup a logger
log = iono_logger.logger("tx-")
# this is the sweep configuration
s = ic.s
# register signals to be caught
signal.signal(signal.SIGUSR1, orderlyExit)
sample_rate=ic.sample_rate
# use the address configured for the transmitter
usrp = uhd.usrp.MultiUSRP("addr=%s" % (ic.tx_addr))
usrp.set_tx_rate(sample_rate)
usrp.set_rx_rate(sample_rate)
rx_subdev_spec=uhd.usrp.SubdevSpec(ic.tx_subdev_refl_pwr)
tx_subdev_spec=uhd.usrp.SubdevSpec(ic.tx_subdev)
usrp.set_tx_subdev_spec(tx_subdev_spec)
usrp.set_rx_subdev_spec(rx_subdev_spec)
# wait until GPS is locked, then align USRP time with global ref
gps_mon=None
if ic.require_gps == False:
usrp.set_clock_source("external");
usrp.set_time_source("external");
time_at_last_pps = usrp.get_time_last_pps().get_real_secs()
while time_at_last_pps == usrp.get_time_last_pps().get_real_secs():
time.sleep(0.1)
usrp.set_time_next_pps(uhd.libpyuhd.types.time_spec(int(n.ceil(time.time()))));
else:
gl.sync_clock(usrp, log, min_sync_time=ic.min_gps_lock_time)
gps_mon=gl.gpsdo_monitor(usrp, log, ic.gps_holdover_time)
# start with first frequency on tx and rx
tune_req=uhd.libpyuhd.types.tune_request(s.freq(0))
usrp.set_tx_freq(tune_req)
usrp.set_rx_freq(tune_req)
# hold SWR measurement about half of the transmit waveform length, so
# we have no timing issues
swr_buffer=n.empty(int(0.5*sample_rate*s.freq_dur), dtype=n.complex64)
# figure out when to start the cycle
t_now=usrp.get_time_now().get_real_secs()
t0=n.uint64(n.floor(t_now/(s.sweep_len_s))*s.sweep_len_s+s.sweep_len_s)
print("starting next sweep at %1.2f" % (s.sweep_len_s))
gpio_state=0
while not Exit:
t0_dt = datetime.fromtimestamp(t0)
log.log("Starting sweep at %1.2f (%s)" % (t0, t0_dt.strftime("%FT%T.%f")[:-3]))
for i in range(s.n_freqs):
f0, dt=s.pars(i)
print("f=%f code %s" % (f0/1e6, s.code(i)))
transmit_waveform(usrp, n.uint64(t0+dt), s.waveform(i), swr_buffer, f0, log, ic)
# tune to next frequency 0.0 s before end
next_freq_idx=(i+1) % s.n_freqs
tune_at(usrp, t0+dt+s.freq_dur-0.05, ic, f0=s.freq(next_freq_idx), gpio_state=gpio_state)
gpio_state=(gpio_state+1) % 2
# check that GPS is still locked.
if ic.require_gps:
gps_mon.check()
t0+=n.uint64(s.sweep_len_s)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'-c', '--config',
default="config/default.ini",
help='''Configuration file. (default: %(default)s)''',
)
parser.add_argument(
'-v', '--verbose',
action="store_true",
help='''Increase output verbosity. (default: %(default)s)''',
)
op = parser.parse_args()
ic = iono_config.get_config(
config=op.config,
write_waveforms=True,
quiet=not op.verbose
)
main(ic)