forked from labrad/servers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathghz_fpga_server.py
2221 lines (1958 loc) · 92.3 KB
/
ghz_fpga_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (C) 2007, 2008, 2009, 2010 Matthew Neeley
# Copyright (C) 2010, 2011, 2012, 2013
# 2014 Daniel Sank, James Wenner
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# CHANGELOG:
#
# 2011 November 30 - Daniel Sank / Jim Wenner
#
# Run Sequence setting now only tries to get ADC demodulation ranges when
# getTimingData is True. If getTimingData is False, no data is extracted,
# so it's impossible to store the I and Q ranges.
#
# 2011 November 29 - Jim Wenner
#
# Removed adc_recalibrate from adc_bringup since this may randomize order of
# I, Q outputs.
#
# 2011 November 16 - Dan Sank/Jim Wenner
#
# Fixed documentation for dac_lvds and dac_fifo.
# Changed return type tag for dac_bringup. Now an array of clusters instead of
# a cluster of clusters.
#
# For dac and adc device objects, changed device.params to device.buildParams.
# This was done because we now have _build_ specific, and _board_ specific
# parameters stored in the registry and we need to distinguish these in the
# variable names.
#
# In other words, build specific data are in device.buildParams
# and board specific data are in device.boardParams
#
# 2011 November 10 - Jim Wenner
#
# Fixed bug where, in list_dacs and list_adcs, looked for (name, id) in devices
# when checking board groups even though only name present by this point.
#
# 2011 November 4 - Daniel Sank
#
# The code around line 1172 which read "c[runner.dev]['ranges']=runner.ranges"
# didn't work because I had never assigned runner.ranges. This is now assigned
# near line 601.
#
# 2011 November 2 - Jim Wenner
#
# Moved bringup code into the server. This was done to help keep bringup and
# server code synchronized with each other.
#
# DAC bringup now has signed data as default. Added
# functions to return list of DACs or of ADCs.
#
# In setFIFO, removed reset of LVDS sample delay. Changed how check FIFO
# counter to ensure final value same as what thought setting to. In both
# setFIFO and setLVDS, added success/ failure checks and modified return
# parameters. Changed default FIFO counter from 3 to value in registry provided
# for each board. Changed default setLVDS behavior from optimizing LVDS SD to
# getting SD from board-specific registry key while adding option to use
# optimal SD instead. setLVDS returns MSD, MHD even if sample delay specified.
#
# Board specific registry keys are located in ['Servers', 'GHz FPGAs'] and are
# of form:
# dacN=[('fifoCounter', 3), ('lvdsSD', 3), ('lvdsPhase', 180)]
#
# 2011 February 9 - Daniel Sank
# Removed almost all references to hardcoded hardware parameters, for example
# the various SRAM lengths. These values are now board specific.
# As an example of how this is implemented, we used to have something like
# this:
# def adc_filter_func(self, c, bytes, stretchLen=0, stretchAt=0):
# assert len(bytes) <= FILTER_LEN, 'Filter function max length is %d' \
# % FILTER_LEN
# dev = self.selectedADC(c)
# ...
# where FILTER_LEN was a global constant. We now instead have this:
# def adc_filter_func(self, c, bytes, stretchLen=0, stretchAt=0):
# dev = self.selectedADC(c)
# assert len(bytes) <= dev.buildParams['FILTER_LEN'], 'Filter function max\
# length is %d' % dev.buildParams['FILTER_LEN']
# ...
# so that the filter length is board specific. These board specific parameters
# are loaded by the board objects when they are created, See dac.py and adc.py
# for details on how these parameters are loaded.
# + DOCUMENTATION
#
# Communication between the computer and the FPGA boards works over ethernet.
# This server and the associated board type definition files dac.py and adc.py
# abstract away this ethernet communication. This means that you don't have
# to explicitly tell the direct ethernet server to send packets to the boards.
# Instead you call, for example, this server's Memory command and the server
# will build the appropriate packets for the board you've selected and the
# memory sequence you want to send. No packets are actually sent to the boards
# until you tell them to turn using one of the following commands:
# DAC Run SRAM - Runs SRAM on one board without waiting for a daisychain
# pulse.
# ADC Run Demod - Runs ADC demod mode on one board without waiting for a
# daisychain pulse.
# ADC Run Average - Runs ADC average mode on one board without waiting for a
# daisychain pulse.
# Run Sequence - Runs multiple boards synchronously using the daisychain
# (DACs and ADCs).
# When one of the one-off (no daisychain) commands is sent, whichever DAC
# or ADC you have selected in your context will run and return data as
# appropriate. The use of Run Sequence is slightly more complicated. See below.
#
# ++ USING RUN SEQUENCE
# The Run Sequence command is used to run multiple boards synchronously using
# daisychain pulses to trigger SRAM execution. Here are the steps to use it:
# 1. "Daisy Chain" to specify what boards will run
# 2. "Timing Order" to specify which boards' data you will collect
# 3. Set up DACS - for each DAC you want to run call Select Device and then:
# a. SRAM or SRAM dual block
# b. Memory
# c. Start Delay
# 4. Set up ADCs - for each ADC you want to run call Select Device and then:
# a. ADC Run Mode
# b. ADC Filter Func (set this even if you aren't using demodulation mode)
# c. ADC Demod Phase
# d. ADC Trig Magnitude
# e. Start Delay
# For information on the format of the data returned by Run Sequence see its
# docstring.
#
# ++ REGISTRY KEYS
# In order for the server to set up the board groups and fpga devices properly
# there are a couple of registry entries that need to be set up. Registry keys
# for this server live in ['', 'Servers', 'GHz FPGAs']
#
# boardGroups: *(ssw*(sw)), [(groupName, directEthernetServername, portNumber,
# [(boardName, daisychainDelay), ...]), ...]
# This key tells the server what boards groups should exist, what direct
# ethernet server controlls that group, which ethernet port is connected to the
# boards (via an ethernet switch) and what boards exist on each group. Board
# names should be of the form "DAC N" or "ADC N" where N is the number
# determined by the DIP switches on the board. The number after each board name
# is the number of clock cycles that board should wait after receiving a
# daisychain pulse before starting to run its SRAM.
#
# dacBuildX: *(s?), [(parameterName, value), (parameterName, value), ...]
# adcBuildX: *(s?), [(parameterName, value), (parameterName, value), ...]
# When FPGA board objects are created they read the registry to find hardware
# parameter values. For example, the DAC board objects need to know how long
# their SRAM memory is, and each board may have a different value depending on
# its specific FPGA chip. Details, lists of necessary parameters and example
# values for each board type are given in dac.py and adc.py
#
# dacN: *(s?), [(parameterName, value), (parameterName, value), ...] Parameters
# which are specific to individual boards. This is used for the default FIFO
# counter, LVDS SD, etc. See examples in dac.py
from __future__ import with_statement
"""
TODO
cmdTime_cycles does not properly estimate sram length
"""
"""
### BEGIN NODE INFO
[info]
name = GHz FPGAs
version = 5.2.0
description = Talks to DAC and ADC boards
[startup]
cmdline = %PYTHON% %FILE%
timeout = 20
[shutdown]
message = 987654321
timeout = 20
### END NODE INFO
"""
import itertools
import logging
import os
import random
import struct
import sys
import time
import numpy as np
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue
from labrad import types as T, units as U
from labrad.devices import DeviceServer
from labrad.server import setting
from labrad.units import Unit, Value
import labrad.util
import fpgalib.adc as adc
import fpgalib.dac as dac
import fpgalib.fpga as fpga
from fpgalib.util import TimedLock, LoggingPacket
# The logging level is set at the bottom of the file where the server starts.
# To get additional info about what the server is doing (i.e. to see if it
# gets to a certain part of run_sequence), change the logging level to
# logging.INFO. To print the Direct Etherenet packets, set it to
# logging.DEBUG.
def timeString():
ts = ('{0.tm_year} {0.tm_mon} {0.tm_mday} {0.tm_hour} {0.tm_min} {0.tm_sec}'
.format(time.localtime()))
return ts
LOGGING_PACKET = False
NUM_PAGES = 2
I2C_RB = 0x100
I2C_ACK = 0x200
I2C_RB_ACK = I2C_RB | I2C_ACK
I2C_END = 0x400
# TODO: Remove the constants from above and put them in the registry to be
# read by individual DAC board instances. See DacDevice.connect to see how
# this is done
# TODO: make sure paged operations (datataking) don't conflict with e.g.
# bringup
# - want to do this by having two modes for boards, either 'test' mode
# (when a board does not belong to a board group) or 'production' mode
# (when a board does belong to a board group). It would be nice if boards
# could be dynamically moved between groups, but we'll see about that...
# TODO: store memory and SRAM as numpy arrays, rather than lists and strings,
# respectively
# TODO: run sequences to verify the daisy-chain order automatically
# TODO: when running adc boards in demodulation (streaming mode), check
# counters to verify that there is no packet loss
# TODO: think about whether page selection and pipe semaphore can interact
# badly to slow down pipelining
class TimeoutError(Exception):
"""Error raised when boards timeout."""
class BoardGroup(object):
"""Manages a group of GHz DAC boards that can be run simultaneously.
All the fpga boards must be daisy-chained to allow for synchronization,
and also must be connected to the same network card. Currently, one
board group is created automatically for each detected network card.
Only one sequence at a time can be run on a board group, but memory
and SRAM updates can be pipelined so that while a sequence is running
on some set of the boards in the group, new sequence data for the next
point can be uploaded.
"""
def __init__(self, fpgaServer, directEthernetServer, port):
self.fpgaServer = fpgaServer
self.directEthernetServer = directEthernetServer
self.port = port
self.ctx = None
self.pipeSemaphore = defer.DeferredSemaphore(NUM_PAGES)
self.pageNums = itertools.cycle(range(NUM_PAGES))
self.pageLocks = [TimedLock() for _ in range(NUM_PAGES)]
self.runLock = TimedLock()
self.readLock = TimedLock()
self.setupState = set()
self.runWaitTimes = []
self.prevTriggers = 0
@inlineCallbacks
def init(self):
"""Set up the direct ethernet server in our own context."""
self.ctx = self.directEthernetServer.context()
p = self.directEthernetServer.packet(context=self.ctx)
p.connect(self.port)
yield p.send()
@inlineCallbacks
def shutdown(self):
"""Clean up when this board group is removed."""
# expire our context with the manager
cxn = self.directEthernetServer._cxn
servers = yield cxn.manager.servers()
server_ids = set(id for id, name in servers)
if self.directEthernetServer.ID in server_ids:
yield cxn.manager.expire_context(
self.directEthernetServer.ID, context=self.ctx)
def configure(self, name, boards):
"""Update configuration for this board group."""
self.name = name
self.boardOrder = ['{} {}'.format(name, boardName) for
(boardName, delay) in boards]
self.boardDelays = [delay for (boardName, delay) in boards]
@inlineCallbacks
def detectBoards(self):
"""Detect boards on the ethernet adapter managed by this board group.
The autodetect operation is guarded by board group locks so that it
will not conflict with sequences running on this board group.
"""
try:
# Acquire all locks so we can ping boards without interfering with
# board group operations.
for i in xrange(NUM_PAGES):
yield self.pipeSemaphore.acquire()
for pageLock in self.pageLocks:
yield pageLock.acquire()
yield self.runLock.acquire()
yield self.readLock.acquire()
# Detect each board type in its own context.
detections = [self.detectDACs(), self.detectADCs()]
answer = yield defer.DeferredList(detections, consumeErrors=True)
found = []
for success, result in answer:
if success:
found.extend(result)
else:
print 'autodetect error:'
result.printTraceback()
# Clear detection packets which may be buffered in device contexts.
# TODO: check that this actually clears packets.
devices = self.devices()
clears = []
for dev in devices:
clears.append(dev.clear().send())
returnValue(found)
finally:
# Release all locks once we're done with autodetection.
for i in xrange(NUM_PAGES):
self.pipeSemaphore.release()
for pageLock in self.pageLocks:
pageLock.release()
self.runLock.release()
self.readLock.release()
def detectDACs(self, timeout=1.0):
"""Try to detect DAC boards on this board group."""
def callback(src, data):
board = int(src[-2:], 16)
build = dac.DAC.readback2BuildNumber(data)
devName = '{} DAC {}'.format(self.name, board)
args = (devName, self, self.directEthernetServer, self.port, board,
build)
return (devName, args)
macs = [dac.DAC.macFor(board) for board in range(256)]
return self._doDetection(macs, dac.DAC.regPing(),
dac.DAC.READBACK_LEN, callback)
def detectADCs(self, timeout=1.0):
"""Try to detect ADC boards on this board group."""
def callback(src, data):
board = int(src[-2:], 16)
build = adc.ADC.readback2BuildNumber(data)
devName = '{} ADC {}'.format(self.name, board)
args = (devName, self, self.directEthernetServer, self.port, board,
build)
return (devName, args)
macs = [adc.ADC.macFor(board) for board in range(256)]
return self._doDetection(macs, adc.ADC.regPing(),
adc.ADC.READBACK_LEN, callback)
@inlineCallbacks
def _doDetection(self, macs, packet, respLength, callback, timeout=1.0):
"""
Try to detect a boards at the specified mac addresses.
For each response of the correct length received within the timeout
from one of the given mac addresses, the callback function will be
called and should return data to be added to the list of found
devices.
"""
try:
ctx = self.directEthernetServer.context()
# Prepare and send detection packets.
p = self.directEthernetServer.packet()
p.connect(self.port)
p.require_length(respLength)
p.timeout(T.Value(timeout, 's'))
p.listen()
for mac in macs:
p.destination_mac(mac)
p.write(packet.tostring())
yield p.send(context=ctx)
# Listen for responses.
start = time.time()
found = []
while (len(found) < len(macs)) and (time.time() - start < timeout):
try:
ans = yield self.directEthernetServer.read(1, context=ctx)
src, dst, eth, data = ans[0]
if src in macs:
devInfo = callback(src, data)
found.append(devInfo)
except T.Error as e:
logging.error('timeout exception: {}'.format(e))
break # Read timeout.
returnValue(found)
finally:
# Expire the detection context.
cxn = self.directEthernetServer._cxn
yield cxn.manager.expire_context(self.directEthernetServer.ID,
context=ctx)
def devices(self):
"""
Return a list of known device objects belonging to this board group.
"""
return [dev for dev in self.fpgaServer.devices.values()
if dev.boardGroup == self]
@inlineCallbacks
def testMode(self, func, *a, **kw):
"""
Call a function in test mode.
This makes sure that all currently-executing pipeline stages
are finished by acquiring the pipe semaphore for all pages,
then runs the function, and finally releases the semaphore
to allow the pipeline to continue.
"""
for i in xrange(NUM_PAGES):
yield self.pipeSemaphore.acquire()
try:
ans = yield func(*a, **kw)
returnValue(ans)
finally:
for i in xrange(NUM_PAGES):
self.pipeSemaphore.release()
def makePackets(self, runners, page, reps, timingOrder, sync=249):
"""Make packets to run a sequence on this board group.
Running a sequence has 4 stages:
- Load memory and SRAM into all boards in parallel.
If possible, this is done in the background using a separate
page while another sequence is running.
- Run sequence by firing a single packet that starts all boards.
To ensure synchronization the slaves are started first, in
daisy-chain order, followed at the end by the master.
- Collect timing data to ensure that the sequence is finished.
We instruct the direct ethernet server to collect the packets
but not send them yet. Once collected, direct ethernet triggers
are used to immediately start the next sequence if one was
loaded into the next page.
- Read timing data.
Having started the next sequence (if one was waiting) we now
read the timing data collected by the direct ethernet server,
process it and return it.
This function prepares the LabRAD packets that will be sent for
each of these steps, but does not actually send anything. By
preparing these packets in advance we save time later when we
are in the time-critical pipeline sections
loadPkts: list of packets, one for each board
setupPkts: list of (packet, setup state). Only for ADC
runPkts: wait, run, both. These packets are sent in the master
context, and are placed carefully in order so that the
master board runs last.
collectPkts: list of packets, one for each board. These packets
tell the direct ethernet server to collect, and then
if successful, send triggers to the master context.
readPkts: list of packets. Simply read back data from direct
ethernet buffer for each board's context.
Packets generated by dac and adc objects are make with the
context set to that device's context. This ensures that the
packets have the right destination MAC and therefore arrive in
the right place.
"""
# Dictionary of devices to be run.
runnerInfo = dict((runner.dev.devName, runner) for runner in runners)
# Upload sequence data (pipelined).
loadPkts = []
for board in self.boardOrder:
if board in runnerInfo:
runner = runnerInfo[board]
isMaster = len(loadPkts) == 0
p = runner.loadPacket(page, isMaster)
if p is not None:
loadPkts.append(p)
# Setup board state (not pipelined).
# Build a list of (setupPacket, setupState).
setupPkts = []
for board in self.boardOrder:
if board in runnerInfo:
runner = runnerInfo[board]
p = runner.setupPacket()
if p is not None:
setupPkts.append(p)
# Run all boards (master last).
# Set the first board which is both in the boardOrder and also in the
# list of runners for this sequence as the master. Any subsequent boards
# for which we have a runner are set to slave mode, while subsequent
# unused boards are set to idle mode. For example:
# All boards: 000000
# runners: --XX-X
# mode: msis (i: idle, m: master, s: slave) -DTS
boards = [] # List of (<device object>, <register bytes to write>).
for board, delay in zip(self.boardOrder, self.boardDelays):
if board in runnerInfo:
runner = runnerInfo[board]
slave = len(boards) > 0
regs = runner.runPacket(page, slave, delay, sync)
boards.append((runner.dev, regs))
elif len(boards):
# This board is after the master, but will not itself run, so
# we put it in idle mode.
dev = self.fpgaServer.devices[board] # Look up device wrapper.
if isinstance(dev, dac.DAC):
regs = dev.regIdle(delay)
boards.append((dev, regs))
elif isinstance(dev, adc.ADC):
# ADC boards always pass through signals, so no need for
# Idle mode.
pass
boards = boards[1:] + boards[:1] # move master to the end.
runPkts = self.makeRunPackets(boards)
# Collect and read (or discard) timing results.
seqTime = max(runner.seqTime for runner in runners)
collectPkts = [runner.collectPacket(seqTime, self.ctx)
for runner in runners]
readPkts = [runner.readPacket(timingOrder) for runner in runners]
return loadPkts, setupPkts, runPkts, collectPkts, readPkts
def makeRunPackets(self, data):
"""Create packets to run a set of boards.
There are two options as to how this can work, depending on
whether the setup state from the previous run is the same as
for this run. If no changes to the setup state are required,
then we can wait for triggers and immediately start the next
run; this is what the 'both' packet does. If the setup state
has changed, we must wait for triggers, then send setup packets,
and then start the next run. This two-stage operation is what
the 'wait' and 'run' packets do. We create both here because
we can't tell until it is our turn in the pipe which method
will be used.
"""
wait = self.directEthernetServer.packet(context=self.ctx)
run = self.directEthernetServer.packet(context=self.ctx)
both = self.directEthernetServer.packet(context=self.ctx)
if LOGGING_PACKET:
wait = LoggingPacket(wait, name='run=wait')
run = LoggingPacket(run, name='run=run')
both = LoggingPacket(both, name='run=both')
# Wait for triggers and discard them. The actual number of triggers to
# wait for will be decide later. The 0 is a placeholder here.
wait.wait_for_trigger(0, key='nTriggers')
both.wait_for_trigger(0, key='nTriggers')
# Run all boards.
for dev, regs in data:
bytes = regs.tostring()
# We must switch to each board's destination MAC each time we write
# data because our packets for the direct ethernet server is in the
# main context of the board group, and therefore does not have a
# specific destination MAC.
run.destination_mac(dev.MAC).write(bytes)
both.destination_mac(dev.MAC).write(bytes)
return wait, run, both
@inlineCallbacks
def run(self, runners, reps, setupPkts, setupState, sync, getTimingData,
timingOrder):
"""Run a sequence on this board group."""
# Check whether this sequence will fit in just one page.
if all(runner.pageable() for runner in runners):
# Lock just one page.
page = self.pageNums.next()
pageLocks = [self.pageLocks[page]]
else:
# Start on page 0 and set pageLocks to all pages.
print 'Paging off: SRAM too long.'
page = 0
pageLocks = self.pageLocks
# Prepare packets.
logging.info('making packets')
pkts = self.makePackets(runners, page, reps, timingOrder, sync)
loadPkts, boardSetupPkts, runPkts, collectPkts, readPkts = pkts
# Add setup packets from boards (ADCs) to that provided in the args:
# setupPkts is a list.
# setupState is a set.
setupPkts.extend(pkt for pkt, state in boardSetupPkts)
setupState.update(state for pkt, state in boardSetupPkts)
try:
yield self.pipeSemaphore.acquire()
logging.info('pipe semaphore acquired')
try:
# Stage 1: load.
for pageLock in pageLocks: # Lock pages to be written.
yield pageLock.acquire()
logging.info('page locks acquired')
# Send load packets. Do not wait for response. We already
# acquired the page lock, so sending data to SRAM and memory is
# kosher at this time.
# TODO: Need to check what 'load packets' is for ADC and make
# sure sending load packets here is ok.
loadDone = self.sendAll(loadPkts, 'Load')
# stage 2: run
# Send a request for the run lock, do not wait for response.
runNow = self.runLock.acquire()
try:
yield loadDone # wait until load is finished.
yield runNow # Wait for acquisition of the run lock.
logging.info('run lock acquired')
# Set the number of triggers needed before we can actually
# run. We expect to get one trigger for each board that
# had to run and return data. This is the number of
# runners in the previous sequence.
logging.info(
'num prev triggers: {}'.format(self.prevTriggers))
waitPkt, runPkt, bothPkt = runPkts
waitPkt['nTriggers'] = self.prevTriggers
bothPkt['nTriggers'] = self.prevTriggers
# store the number of triggers for the next run
self.prevTriggers = len(runners)
logging.info('num runners: {}'.format(len(runners)))
# If the passed in setup state setupState, or the current
# actual setup state, self.setupState are empty, we need
# to set things up. Also if the desired setup state isn't
# a subset of the actual one, we need to set up.
# XXX Check what setup state means for ADC. Should this
# include the trigger/demodulator tables or not?
needSetup = ((not setupState) or (not self.setupState) or
(not (setupState <= self.setupState)))
if needSetup:
logging.info('needSetup = True')
# we require changes to the setup state so first, wait
# for triggers indicating that the previous run has
# collected.
# If this fails, something BAD happened!
r = yield waitPkt.send()
logging.info('waitPkt sent')
try:
# Then set up
logging.info('sending setupPkts...')
yield self.sendAll(setupPkts, 'Setup')
logging.info('...setupPkts sent')
self.setupState = setupState
except Exception as e:
# if there was an error, clear setup state
logging.info('catching setupPkts exception')
self.setupState = set()
logging.error(
'Exception in setupPkts: {}'.format(e))
raise e
# and finally run the sequence
logging.info('sending runPkt...')
yield runPkt.send()
logging.info('...runPkt sent')
else:
# if this fails, something BAD happened!
logging.info('need setup = false')
r = yield bothPkt.send()
# Keep track of how long the packet waited before being
# able to run.
# XXX How does this work? Why is r['nTriggers'] the wait
# time?
# print "fpga server: r['nTriggers']: %s" % (r['nTriggers'])
self.runWaitTimes.append(r['nTriggers']['s'])
if len(self.runWaitTimes) > 100:
self.runWaitTimes.pop(0)
yield self.readLock.acquire() # wait for our turn to read
logging.info('read lock acquired')
# stage 3: collect
# Collect appropriate number of packets and then trigger
# the master context.
collectAll = defer.DeferredList(
[p.send() for p in collectPkts], consumeErrors=True)
logging.info('waiting for collect packets')
finally:
# by releasing the runLock, we allow the next sequence to
# send its run packet. if our collect fails due to a
# timeout, however, our triggers will not all be sent to
# the run context, so that it will stay blocked until
# after we cleanup and send the necessary triggers
# We now release the run lock. Other users will now be
# able to send their run packet to the direct ethernet,
# but the direct ethernet will not actually send run
# commands to the FPGA boards until the master context
# receives all expected triggers. These triggers are sent
# along with the collect packets, and succeed only if the
# collect commands do not time out. This means that the
# boards won't run until either our collect succeeds,
# meaning we're finished running and got all expected
# data, or we clean up from a timeout and manually send
# the necessary number of triggers. Note that we release
# the run lock IMMEDIATELY after sending the request to
# collect so that other users can get going ASAP.
# The direct ethernet server will allow other run commands
# to go as soon as our triggers are received, but only if
# that run command has been sent!
self.runLock.release()
logging.info('run lock released')
# Wait for data to be collected.
results = yield collectAll
logging.info('results collected')
finally:
for pageLock in pageLocks:
pageLock.release()
logging.info('page lock released')
# check for a timeout and recover if necessary
if not all(success for success, result in results):
for success, result in results:
if not success:
result.printTraceback()
yield self.recoverFromTimeout(runners, results)
self.readLock.release()
raise TimeoutError(self.timeoutReport(runners, results))
# stage 4: read
# no timeout, so go ahead and read data
boardOrder = [runner.dev.devName for runner in runners]
readAll = self.sendAll(readPkts, 'Read', boardOrder)
self.readLock.release()
# This line scales really badly with incrasing stats
# At 9600 stats the next line takes 10s out of 20s per
# sequence.
results = yield readAll # wait for read to complete
if getTimingData:
answers = []
# Cache of already-parsed data from a particular board.
# Prevents un-flattening a packet more than once.
extractedData = {}
for dataChannelName in timingOrder:
if '::' in dataChannelName:
# If dataChannelName has :: in it, it's an ADC
# with specified demod channel
boardName, channel = dataChannelName.split('::')
channel = int(channel)
elif 'DAC' in dataChannelName:
raise RuntimeError('DAC data readback not supported')
elif 'ADC' in dataChannelName:
# ADC average mode
boardName = dataChannelName
channel = None
else:
raise RuntimeError('channel format not understood')
if boardName in extractedData:
# If we have already parsed the packet for this
# board, fetch the cached result.
extracted = extractedData[boardName]
else:
# Otherwise, extract data, cache it, and add
# relevant part to the list of returned data
idx = boardOrder.index(boardName)
runner = runners[idx]
result = [data for src, dest, eth, data in
results[idx]['read']]
# Array of all timing results (DAC)
extracted = runner.extract(result)
extractedData[boardName] = extracted
# Add extracted data to list of data to be returned
if channel != None:
# If this is an ADC demod channel, grab that
# channel's data only
extractedChannel = extracted[0][channel]
else:
extractedChannel = extracted
answers.append(extractedChannel)
returnValue(tuple(answers))
finally:
self.pipeSemaphore.release()
@inlineCallbacks
def sendAll(self, packets, info, infoList=None):
"""Send a list of packets and wrap them up in a deferred list."""
# Remove packets which contain no actual requests.
packets = [p for p in packets if p._packet]
results = yield defer.DeferredList([p.send() for p in packets],
consumeErrors=True) # [(success, result)...]
if all(s for s, r in results):
# return the list of results
returnValue([r for s, r in results])
else:
# create an informative error message
msg = 'Error(s) occured during {}:\n'.format(info)
if infoList is None:
msg += (''.join(r.getBriefTraceback()
for s, r in results if not s))
else:
for i, (s, r) in zip(infoList, results):
m = 'OK' if s else ('error!\n' + r.getBriefTraceback())
msg += '{} : {}\n\n'.format(i, m)
raise Exception(msg)
def extractTiming(self, packets):
"""Extract timing data coming back from a readPacket."""
data = ''.join(data[3:63] for data in packets)
return np.fromstring(data, dtype='<u2').astype('u4')
@inlineCallbacks
def recoverFromTimeout(self, runners, results):
"""Recover from a timeout error so that pipelining can proceed.
The recovery proceeds as follows:
(1) Get execution counts. For each board we clear the packet buffer and
ping the board to see how many times it executed its SRAM sequence (or
demod sequence for ADC boards). This count is stored in the runner
object for the board for later reporting to the user.
(2) Send triggers. After all boards have been pinged, we again clear
the packet buffers for all boards and then send a trigger to the board
group run context from each failed board. We must do this to unlock the
run context since the trigger would not have been sent yet if packet
collection failed.
"""
print 'RECOVERING FROM TIMEOUT'
# Get execution counts.
for runner, (success, result) in zip(runners, results):
yield runner.dev.clear().send()
try:
# NOTE: in the current implementation of regPing for DAC boards
# (build 15) the start field is set to master, which means when
# we ping these boards they will emit daisy chain signals.
p = runner.dev.regPingPacket()
p.timeout(U.Value(1.0, 's')).read(1)
resp = yield p.send()
regs = runner.dev.processReadback(resp.read[0][3])
runner.executionCount = regs.get('executionCounter', None)
except Exception:
logging.error('Exception in recoverFromTimeout', exc_info=True)
# Send triggers.
for runner, (success, result) in zip(runners, results):
yield runner.dev.clear().send()
if not success:
yield runner.dev.trigger(self.ctx).send()
def timeoutReport(self, runners, results):
"""Create a nice error message explaining which boards timed out."""
lines = ['Some boards failed:']
for runner, (success, result) in zip(runners, results):
line = '{name}: {state}. Executions: expected={expected}, actual={actual}'.format(
name=runner.dev.devName,
state='OK' if success else 'timeout!',
expected=runner.reps,
actual=getattr(runner, 'executionCount', 'unknown')
)
lines.append(line)
return '\n'.join(lines)
class FPGAServer(DeviceServer):
"""Server for GHz DAC and ADC boards.
"""
name = 'GHz FPGAs'
retries = 5
@inlineCallbacks
def initServer(self):
self.boardGroups = {}
yield DeviceServer.initServer(self)
@inlineCallbacks
def loadBoardGroupConfig(self):
print 'Loading board group definitions from registry...'
p = self.client.registry.packet()
p.cd(['', 'Servers', 'GHz FPGAs'], True)
p.get('boardGroups', True, [], key='boardGroups')
ans = yield p.send()
print 'Board group definitions loaded.'
# validate board group definitions
valid = True
names = set()
adapters = set()
for name, server, port, boards in ans['boardGroups']:
if name in names:
print 'Multiple board groups with name "{}"'.format(name)
valid = False
names.add(name)
if (server, port) in adapters:
print 'Multiple board groups for adapter ({}, {})'.format(
server, port)
valid = False
adapters.add((server, port))
if valid:
self.boardGroupDefs = ans['boardGroups']
print 'Board group definitions ok.'
else:
print 'Please fix the board group configuration.'
@inlineCallbacks
def adapterExists(self, server, port):
"""Check whether the specified ethernet adapter exists."""
cxn = self.client
if server not in cxn.servers:
returnValue(False)
else:
de = cxn.servers[server]
adapters = yield de.adapters()
if len(adapters):
ports, names = zip(*adapters)
else:
ports, names = [], []
returnValue(port in ports)
@inlineCallbacks
def findDevices(self):
print 'Refreshing client connection...'
cxn = self.client
yield cxn.refresh()
# Reload the board group configuration from the registry.
yield self.loadBoardGroupConfig() # Creates self.boardGroupDefs
config = dict(((server, port), (name, boards)) # The keys are tuples!
for name, server, port, boards in self.boardGroupDefs)
# Determine what board groups are to be added, removed and kept as is.
existing = set(self.boardGroups.keys())
configured = set((server, port)
for _, server, port, _ in self.boardGroupDefs)
additions = configured - existing
removals = existing - configured
keepers = existing - removals
# Check each addition to see whether the desired server/port exists.
for key in set(additions):
server, port = key
exists = yield self.adapterExists(server, port)
if not exists:
print ('Adapter "{}" (port {}) does not exist. Group will not '
'be added.'.format(server, port))
additions.remove(key)
# Check each keeper to see whether the server/port still exists.
for key in set(keepers):
server, port = key
exists = yield self.adapterExists(server, port)
if not exists:
print ('Adapter "{}" (port {}) does not exist. Group will be '
'removed.'.format(server, port))
keepers.remove(key)
removals.add(key)
print 'Board groups to be added:', additions
print 'Board groups to be removed:', removals
# Remove board groups which are no longer configured.
for key in removals:
bg = self.boardGroups[key]
del self.boardGroups[key]
try:
yield bg.shutdown()
except Exception, e:
logging.error('Error removing board group: {}'.format(key),
exc_info=True)
# Add new board groups.
for server, port in additions:
name, boards = config[server, port]
print ('Creating board group "{}": server="{}", port={}'
.format(name, server, port))
de = cxn.servers[server]
boardGroup = BoardGroup(self, de, port) # Sets attributes.