-
Notifications
You must be signed in to change notification settings - Fork 0
/
VDS.py
724 lines (605 loc) · 31.1 KB
/
VDS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
# Copyright []
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
### BEGIN NODE INFO
[info]
name = Virtual Device Server
version = 0.3
description = Virtual Device Server for LabRAD. Handles dedicated output channels
[startup]
cmdline = %PYTHON% %FILE%
timeout = 20
[shutdown]
message = 987654321
timeout = 20
### END NODE INFO
"""
from labrad.server import LabradServer, setting, Signal
from twisted.internet.defer import inlineCallbacks, returnValue
import labrad.units as units
from labrad.types import Value
###########################
## ChannelInstance class ##
###########################
class ChannelInstance(object):
def __init__(
self,
context,
ID,
name,
label,
description,
tags,
has_get,
has_set,
get_setting,
get_inputs,
get_inputs_units,
set_setting,
set_var_slot,
set_var_units,
set_statics,
set_statics_units,
set_min,
set_max,
set_offset,
set_scale,
):
self.context = context
self.ID = ID
self.name = name
self.label = label
self.description = description
self.tags = tags
self.has_get = has_get
self.has_set = has_set
self.get_setting = get_setting
self.get_inputs = get_inputs
self.get_inputs_units = get_inputs_units
self.set_setting = set_setting
self.set_var_slot = set_var_slot
self.set_var_units = set_var_units
self.set_statics = set_statics
self.set_statics_units = set_statics_units
self.set_min = set_min
self.set_max = set_max
self.set_offset = set_offset
self.set_scale = set_scale
#def __repr__(self):
# return """Channel Instance Object with < ID:{ID} name:{name} >\n\n{description}""".format(ID=self.ID,name=self.name,description=self.description)
#def __str__(self):
# return """Channel Instance Object with < ID:{ID} name:{name} >\n\n{description}""".format(ID=self.ID,name=self.name,description=self.description)
###############################
## Formatting/data functions ##
###############################
def to_type(value,type_):
if type_.startswith('.'):return Value(value,type_[1:])
if type_ in ['string','str','s'] : return str(value)
if type_ in ['float','f','v',''] : return float(value)
if type_ in ['integer','int','i'] : return int(value)
return Value(value,type_)
def assemble_set_list(set_var_slot,set_var_value,set_statics):
if set_var_slot > len(set_statics):raise ValueError("Variable slot ({set_var_slot}) higher than highest input slot ({h_slot})".format(set_var_slot=set_var_slot,h_slot=len(set_statics)))
ret = []
ret += set_statics[:set_var_slot]
ret += [set_var_value]
ret += set_statics[set_var_slot:]
return ret
class VirtualDeviceServer(LabradServer):
"""
Virtual Device Server.
Handles usage of dedicated channels
"""
name = 'virtual_device_server' # server name (as appears in pylabrad connections)
channel_location = ['','virtual_device_server','channels'] # registry location of channel information
none_types = ['none','None','-',''] # these strings will be interpreted as <None> by the VDS
channels_by_id = {} # These start out empty
channels_by_name = {} # And will be populated on server init
sPrefix = 704000
signal__reg_channel_added = Signal(sPrefix+0,"signal__reg_channel_added" , "*s") # Activated when a new channel is added; parameters = [ID,name]
signal__reg_channel_deleted = Signal(sPrefix+1,"signal__reg_channel_deleted", "*s") # Activated when a channel is deleted ; parameters = [ID,name]
signal__channel_set = Signal(sPrefix+2,"signal__channel_set" , "*s") # Activated when a channel is set ; parameters = [ID,name,response]
signal__channel_get = Signal(sPrefix+3,"signal__channel_get" , "(ssv)") # Activated when a channel is gotten ; parameters = [ID,name,response]
@inlineCallbacks
def initServer(self):
self.reg = self.client.registry # more convenient connection to the registry
self.reg_context = self.client.context() # context for registry operations
yield self.registry_setup() # set up the registry directory if it hasn't been already
self.channels_by_id,self.channels_by_name = yield self.load_all_channels()
#######################
## Registry handling ##
#######################
@inlineCallbacks
def registry_setup(self):
yield self.reg.cd(self.channel_location,True,context=self.reg_context)
@inlineCallbacks
def get_attributes(self,attribute):
"""Gets a list of all channels' values for a specified attribute"""
prev_dir = yield self.reg.cd(context=self.reg_context)
attrs = []
yield self.reg.cd(self.channel_location,context=self.reg_context)
folders,files = yield self.reg.dir(context=self.reg_context)
for folder in folders:
yield self.reg.cd(self.channel_location+[folder],context=self.reg_context)
attr = yield self.reg.get(attribute,context=self.reg_context)
attrs.append(attr)
yield self.reg.cd(prev_dir,context=self.reg_context)
returnValue(attrs)
@inlineCallbacks
def get_folders_by_attribute(self,attribute,value):
"""Find a channel by its value for a particular attribute"""
prev_dir = yield self.reg.cd(context=self.reg_context)
attr_folders = []
yield self.reg.cd(self.channel_location,context=self.reg_context)
folders,files=yield self.reg.dir(context=self.reg_context)
for folder in folders:
yield self.reg.cd(self.channel_location+[folder],context=self.reg_context)
fval = yield self.reg.get(attribute,context=self.reg_context)
if fval == value:attr_folders.append(folder)
yield self.reg.cd(prev_dir,context=self.reg_context)
returnValue( attr_folders )
@inlineCallbacks
def del_folder(self,folder_loc,recur=False):
"""Removes a folder & its keys. If recur is set to True, recursively removes subfolders & subfolder keys."""
if type(folder_loc) != type([]):folder_loc=[folder_loc]
prev_dir = yield self.reg.cd(context=self.reg_context)
yield self.reg.cd(self.channel_location+folder_loc,context=self.reg_context)
folders,files = yield self.reg.dir(context=self.reg_context)
for folder in folders:
if recur:
yield self.del_folder(folder_loc+[folder],True)
else:
pass
for file in files:
yield self.reg.del_(file,context=self.reg_context)
yield self.reg.cd(self.channel_location+folder_loc[:-1],context=self.reg_context)
yield self.reg.rmdir(folder_loc[-1],context=self.reg_context)
try:
yield self.reg.cd(prev_dir,context=self.reg_context)
except:
yield self.reg.cd(self.channel_location,context=self.reg_context)
@inlineCallbacks
def get_folder_by_id_name(self,ID=None,name=None):
"""Get the name of a folder by specifying its name and/or ID
Must specify at least one of name,ID
If both are specified, they must point to the same folder
"""
yield
if (ID==None) and (name==None):
raise ValueError("ID and name cannot both be None: at least one must be specified")
if ID!=None:
byID = yield self.get_folders_by_attribute("ID",ID)
if len(byID) == 0:
raise ValueError("No channels match the ID (%s) given"%ID)
if len(byID) > 1:
raise ValueError("Multiple channels have the same ID (%s); this should not happen."%ID)
byID=byID[0]
if name!=None:
byName = yield self.get_folders_by_attribute("name",name)
if len(byName) == 0:
raise ValueError("No channels match the name (%s) given"%name)
if len(byName) > 1:
raise ValueError("Multiple channels have the same name (%s); this should not happen. ")
byName=byName[0]
if (ID!=None) and (name!=None):
if byID != byName:
raise ValueError("The given ID (%s) and name (%s) do not match; they belong to <%s> and <%s> respectively."%(ID,name,byID,byName))
if (ID != None):returnValue( byID )
returnValue( byName )
@inlineCallbacks
def write_channel_to_registry(
self,
ID,
name,
label,
description,
tags,
has_get,
has_set,
get_setting,
get_inputs,
get_inputs_units,
set_setting,
set_var_slot,
set_var_units,
set_statics,
set_statics_units,
set_min,
set_max,
set_offset,
set_scale,
):
prev_dir = yield self.reg.cd(context=self.reg_context)
yield self.reg.cd(self.channel_location,True,context=self.reg_context)
# create and go to folder for new channel
entryName = "%s (%s)"%(ID,name)
yield self.reg.cd(self.channel_location+[entryName],True,context=self.reg_context)
# informational attributes
yield self.reg.set("name", name, context=self.reg_context) # Write the name
yield self.reg.set("ID", ID, context=self.reg_context) # Write the ID
yield self.reg.set("label", label, context=self.reg_context) # Write the label
yield self.reg.set("description", description, context=self.reg_context) # Write the description
yield self.reg.set("tags", tags, context=self.reg_context) # write the tags
yield self.reg.set("has_get", has_get, context=self.reg_context)
yield self.reg.set("has_set", has_set, context=self.reg_context)
# <get> folder
yield self.reg.cd(self.channel_location+[entryName,"get"],True,context=self.reg_context)
yield self.reg.set("setting", get_setting, context=self.reg_context)
yield self.reg.set("inputs", get_inputs, context=self.reg_context)
yield self.reg.set("inputs_units", get_inputs_units, context=self.reg_context)
# <set> folder
yield self.reg.cd(self.channel_location+[entryName,"set"],True,context=self.reg_context)
yield self.reg.set("setting", set_setting, context=self.reg_context)
yield self.reg.set("var_slot", set_var_slot, context=self.reg_context)
yield self.reg.set("var_units", set_var_units, context=self.reg_context)
yield self.reg.set("statics", set_statics, context=self.reg_context)
yield self.reg.set("statics_units", set_statics_units, context=self.reg_context)
yield self.reg.set("min", set_min, context=self.reg_context)
yield self.reg.set("max", set_max, context=self.reg_context)
yield self.reg.set("offset", set_offset, context=self.reg_context)
yield self.reg.set("scale", set_scale, context=self.reg_context)
yield self.reg.cd(prev_dir,context=self.reg_context)
@inlineCallbacks
def del_channel_from_registry(self,ID=None,name=None):
"""Removes a channel from the registry"""
folder = yield self.get_folder_by_id_name(ID,name)
yield self.del_folder(folder,True)
@inlineCallbacks
def load_channel_by_id_name(self,ID=None,name=None):
"""Loads info from the registry to a channel object"""
channel_folder = yield self.get_folder_by_id_name(ID,name)
channel = yield self.load_channel(channel_folder)
returnValue(channel)
@inlineCallbacks
def load_channel(self,channel_folder):
"""Loads a channel from the registry to a ChannelInstance object"""
prev_dir = yield self.reg.cd(context=self.reg_context)
yield self.reg.cd(self.channel_location+[channel_folder],context=self.reg_context)
# informational stuff
ID = yield self.reg.get("ID", context=self.reg_context)
name = yield self.reg.get("name", context=self.reg_context)
label = yield self.reg.get("label", context=self.reg_context)
description = yield self.reg.get("description", context=self.reg_context)
tags = yield self.reg.get("tags", context=self.reg_context)
has_get = yield self.reg.get("has_get", context=self.reg_context)
has_set = yield self.reg.get("has_set", context=self.reg_context)
# <get> folder
yield self.reg.cd(self.channel_location+[channel_folder,"get"],context=self.reg_context)
get_setting = yield self.reg.get("setting", context=self.reg_context)
get_inputs = yield self.reg.get("inputs", context=self.reg_context)
get_inputs_units = yield self.reg.get("inputs_units", context=self.reg_context)
get_inputs = [to_type(get_inputs[n],get_inputs_units[n]) for n in range(len(get_inputs))] # convert get_inputs to specified types
# <set> folder
yield self.reg.cd(self.channel_location+[channel_folder,"set"],context=self.reg_context)
set_setting = yield self.reg.get("setting", context=self.reg_context)
set_var_slot = yield self.reg.get("var_slot", context=self.reg_context)
set_var_units = yield self.reg.get("var_units", context=self.reg_context)
set_statics = yield self.reg.get("statics", context=self.reg_context)
set_statics_units = yield self.reg.get("statics_units", context=self.reg_context)
set_min = yield self.reg.get("min", context=self.reg_context)
set_max = yield self.reg.get("max", context=self.reg_context)
set_offset = yield self.reg.get("offset", context=self.reg_context)
set_scale = yield self.reg.get("scale", context=self.reg_context)
set_statics = [to_type(set_statics[n],set_statics_units[n]) for n in range(len(set_statics))] # convert statics to specified types
set_min = yield self.bound_interp(set_min ) # These are stored as strings in the regsitry
set_max = yield self.bound_interp(set_max ) # but we need them to be <None> if appropriate
set_offset = yield self.bound_interp(set_offset) # or floats. bound_interp converts "none" or empty
set_scale = yield self.bound_interp(set_scale ) # strings to <None>, and otherwise to floats.
yield self.reg.cd(prev_dir,context=self.reg_context)
channel = ChannelInstance(
self.client.context(), # a unique context for this channel
ID, name, label, description, tags, # informational attributes
has_get, has_set, # has_get & has_set
get_setting, get_inputs, get_inputs_units, # <GET> info
set_setting, set_var_slot, set_var_units, # <SET> info
set_statics, set_statics_units, # <SET> info
set_min, set_max, set_offset, set_scale, # <SET> info
)
returnValue(channel)
@inlineCallbacks
def load_all_channels(self):
"""Loads all channels from registry & returns dicts by ID & by name"""
prev_dir = yield self.reg.cd(context=self.reg_context)
yield self.reg.cd(self.channel_location,context=self.reg_context)
folders,files = yield self.reg.dir(context=self.reg_context)
channels = []
for channel_folder in folders:
try:
channel = yield self.load_channel(channel_folder)
channels.append(channel)
except:
print("Found invalid folder: {channel_folder}; deleting it".format(channel_folder=channel_folder))
yield self.del_folder(channel_folder,True)
yield self.reg.cd(prev_dir,context=self.reg_context)
returnValue([{channel.ID:channel for channel in channels},{channel.name:channel for channel in channels}])
@inlineCallbacks
def reg_modify(self,channel_folder,attribute,new_value,subfolder=None):
prev_dir = yield self.reg.cd(context=self.reg_context)
yield self.reg.cd(self.channel_location+[channel_folder],context=self.reg_context)
if subfolder:yield self.reg.cd(subfolder,context=self.reg_context)
yield self.reg.set(attribute,new_value,context=self.reg_context)
yield self.reg.cd(prev_dir,context=self.reg_context)
@inlineCallbacks
def bound_interp(self,bound):
"""Converts none-interpretable strings into None types, and all other strings to floats"""
low = yield bound.lower()
if low in self.none_types:
returnValue(None)
else:
returnValue(float(bound))
#######################
## Channel functions ##
#######################
@inlineCallbacks
def get_channel_by_id_name(self,ID=None,name=None):
"""Returns a channel specified by name and/or ID"""
yield
if not ID : ID = None
if not name: name = None
if (ID == None) and (name == None):raise ValueError("ID and name can't both be None or empty")
if not (ID is None):
try:
by_id = self.channels_by_id[ID]
except:
raise ValueError("Invalid ID: {ID}; does not correspond to any channel.".format(ID=ID))
if not (name is None):
try:
by_name = self.channels_by_name[name]
except:
raise ValueError("Invalid name: {name}; does not correspond to any channel.".format(name=name))
if ID == None: # If user only specifies name,
channel = by_name # get channel by the name.
elif name == None: # If user only specifies ID,
channel = by_id # get channel by the ID.
else: # If user specifies both, make sure they both validly point to the same channel.
if by_id.ID != by_name.ID : raise ValueError("Name and ID point to different channels. If both specified they must point to the same channel.")
if by_id.name != by_name.name : raise ValueError("Name and ID point to different channels. If both specified they must point to the same channel.")
channel = by_id # now that we know they point to the same channel, just set it to by_id
returnValue(channel)
##############
## Settings ##
##############
@setting(1,"reg add channel",
ID = 's',
name = 's',
label = 's',
description = 's',
tags = '*s',
has_get = 'b',
has_set = 'b',
get_setting = '*s',
get_inputs = '*s',
get_inputs_units = '*s',
set_setting = '*s',
set_var_slot = 'i',
set_var_units = 's',
set_statics = '*s',
set_statics_units = '*s',
set_min = 's', # These will end up as either floats or <None>
set_max = 's', # But to allow for either they are strings
set_offset = 's', # The strings must be interpretable as either
set_scale = 's', # floats or None types.
returns = 'b{success}')
def reg_add_channel(self, c, ID, name, label, description, tags, has_get, has_set, get_setting, get_inputs, get_inputs_units, set_setting, set_var_slot, set_var_units, set_statics, set_statics_units, set_min, set_max, set_offset, set_scale):
"""Adds a new channel to the regsitry.\nDoes not override; to overwrite, first delete the old channel."""
# make sure ID is valid
try:
if int(ID) < 0:raise ValueError("ID specified is negative. Was {ID}, must be non-negative integer.".format(ID=ID))
except:
raise ValueError("ID specified is invalid. Was {ID}, must be non-negative integer.".format(ID=ID))
# check that name and ID aren't taken
IDs = yield self.get_attributes("ID")
names = yield self.get_attributes("name")
if ID in IDs : raise ValueError("ID specified is already taken.")
if name in names: raise ValueError("Name specified is already taken.")
# check that all values in [min,max,scale,offset] are correctly interpretable
for inst in [set_min,set_max,set_offset,set_scale]:
if inst.lower() in self.none_types:continue
try:
float(inst)
except:
raise ValueError("Value ({isnt}) for minValue,maxValue,scale,offset not interpetable as either float or NoneType".format(inst=inst))
# write the channel entry
yield self.write_channel_to_registry(ID,name,label,description,tags,has_get,has_set,get_setting,get_inputs,get_inputs_units,set_setting,set_var_slot,set_var_units,set_statics,set_statics_units,set_min,set_max,set_offset,set_scale)
# Now load & add the new channel
channel = yield self.load_channel_by_id_name(ID = ID)
self.channels_by_id.update([ [ID, channel] ])
self.channels_by_name.update([ [name, channel] ])
# done & succesful
self.signal__reg_channel_added([ID,name])
returnValue(True)
@setting(2,"reg del channel",ID='s',name='s',returns='b{success}')
def reg_del_channel(self,c,ID,name=""):
"""Deletes a channel specified by name, ID, or both"""
if not ID : ID = None
if not name: name = None
if (ID == None) and (name == None):raise ValueError("Error: <ID> and <name> cannot both be None or empty")
if ID == None: ID = self.channels_by_name[name].ID
if name == None: name = self.channels_by_id[ID].name
yield self.del_channel_from_registry(ID,name)
del self.channels_by_name[name]
del self.channels_by_id[ID]
self.signal__reg_channel_deleted([ID,name])
returnValue(True)
@setting(100,"list channels",returns='**s')
def list_channels(self,c):
"""Returns a list of all channels in the registry in the form [ [ID,name], [ID,name], ... ]"""
keys = yield self.channels_by_id.keys()
returnValue([ [str(key),self.channels_by_id[key].name] for key in keys])
#@setting(101,"list active channels",returns='**s')
#def list_active_channel(self,c):
# yield
@setting(102,"list channel details",ID='s',name='s',returns='(ssss*sbb*s*?*s*sis*?*svvvv)')
def list_channel_details(self,c,ID,name=""):
"""Returns the details of a given channel in the form of a list (ID,name,label,description,tags,has_get,has_set,get_setting,get_inputs,get_inputs_units,set_setting,set_var_slot,set_var_units,set_statics,set_statics_units,set_min,set_max,set_offset,set_scale)"""
channel = yield self.get_channel_by_id_name(ID,name)
returnValue([
channel.ID,
channel.name,
channel.label,
channel.description,
channel.tags,
channel.has_get,
channel.has_set,
channel.get_setting,
channel.get_inputs,
channel.get_inputs_units,
channel.set_setting,
channel.set_var_slot,
channel.set_var_units,
channel.set_statics,
channel.set_statics_units,
channel.set_min,
channel.set_max,
channel.set_offset,
channel.set_scale,
])
@setting(103,"modify channel details",modifications='*(s?)',ID='s',name='s',returns='b{success}')
def modify_channel_details(self,c,modifications,ID,name=""):
"""Modify the attributes of a channel. modifications = [ [attribute, new_value], ... ] Example: [['set_min',0],['set_max',1]] will change the enforced lower bound for the set() command to 0, and the upper bound to 1."""
channel = yield self.get_channel_by_id_name(ID,name)
ch_attrs = yield channel.__dict__.keys() # list of ChannelInstance attributes
# make sure that all attribute changes are valid (valid attribute, and correct data type)
typed_modifications = []
for mod in modifications:
attr = mod[0]
new_val = mod[1]
# make sure the attribute is a valid attribute
if not (attr in ch_attrs):
raise ValueError("Invalid attribute specified. Was <{attr}>; valid attributes are <{ch_attrs}>".format(attr=attr,ch_attrs=ch_attrs))
cur_val = yield channel.__getattribute__(attr)
new_val_type = yield type(new_val)
cur_val_type = yield type(cur_val)
try:
if cur_val_type is new_val_type: # same types is A-OK
typed_modifications += [[attr,new_val]] # add to the new list of typed modifications
else:
new_val = cur_val_type(new_val) # Test if types are compatible
typed_modifications += [[attr,new_val]] # Add the retyped modification to the list of typed modifications
except:
raise ValueError("Incompatible types or uninterpretable data for attribute <{attr}>; current is ({cur_val}, type {cur_val_type}), new is ({new_val}, type {new_val_type})".format(attr=attr,cur_val=cur_val,cur_val_type=cur_val_type,new_val=new_val,new_val_type=new_val_type))
# For now changing name or ID with this method is not allowed.
# This is because we would have to change the name of the folder in the registry, which is not a simple task.
# It's on the to-do list
if attr in ['ID','name']:
raise ValueError("Cannot change ID or name yet with this method. If you wish to change the ID and/or name, you must delete the channel and create a new one. You will be able to change ID / name with this command in the future.")
# for ID and name, they still need to be unique
if attr in ['ID','name']:
existing_values = yield self.get_attributes(attr)
if new_val in existing_values:
raise ValueError("Tried to change <{attr}> to value <{new_val}>, which is already taken. ({attr} must be unique. Taken values: {existing_values})".format(attr=attr,new_val=new_val,existing_values=existing_values))
# all the modifications in typed_modifications must be valid, so we may continue
for mod in typed_modifications:
# modify the attribute of the ChannelInstance object
if not (mod[0] in ['ID','name']):
channel.__setattr__(mod[0],mod[1])
else:
# If we change the name and/or ID we have to be careful and update the channels_by_id and channels_by_name dicts
del self.channels_by_name[channel.name] # begin by removing the channel from both dictionaries
del self.channels_by_id[channel.ID] # before modifying the channel's name or ID
channel.__setattr__(mod[0],mod[1]) # Now that the channel is free, we can modify the name and ID
self.channels_by_name.update([ [channel.name,channel] ]) # now we re-attach it to the dictionaries
self.channels_by_id.update([ [channel.ID ,channel] ]) # under its new name / ID
# For now this block never runs as we forbind changing namd/ID
# When that is changed we will have to be careful as we are also changing the registry structure
# since the folder is named <ID (name)> we change the registry folder name when we change name or ID
# updating the registry
channel_folder = yield self.get_folder_by_id_name(channel.ID,channel.name)
# no subfolder items
if mod[0] in ["ID","name","label","description","tags","has_get","has_set"]:
yield self.reg_modify(channel_folder,mod[0],mod[1])
# <get> subfolder items
if mod[0].startswith('get_'):
if mod[0] == 'get_inputs': # inputs are stored as strings in the regsitry regardless of final type, so we convert them before writing
yield self.reg_modify(channel_folder,mod[0][4:],[str(obj) for obj in mod[1]],subfolder='get')
else:
yield self.reg_modify(channel_folder,mod[0][4:],mod[1],subfolder='get')
# <set> subfolder items
if mod[0].startswith('set_'):
if mod[0] == 'set_statics': # statics (inputs) stored as strings, as above.
yield self.reg_modify(channel_folder,mod[0][4:],[str(obj) for obj in mod[1]],subfolder='set')
elif mod[0] in ['set_min','set_max','set_offset','set_scale']: # these are also stored as strings, but individual strings not lists of strings
yield self.reg_modify(channel_folder,mod[0][4:],str(mod[1]),subfolder='set')
else:
yield self.reg_modify(channel_folder,mod[0][4:],mod[1],subfolder='set')
# if we got this far we were successful
returnValue(True)
@setting(1000,"set channel",ID='s',name='s',value='v',returns='s{response}')
def set_channel(self,c,value,ID,name=""):
"""Set the output of a channel. \nChannel specified by name and/or ID. \nOutput specified by value"""
# How do we tell if a channel is active? Corresponding server, and corresponding device
# Error if accesing inactive channel?
# Setting for getting a list of active channels as opposed to all channels?
#
# Should get and set both have server & device
# or should server & device be shared, and get/set only specify settings?
# for now assume server & device are the same between get & set
# could cause problems (wrong thing being set) if set & get have different devices, as set & get share the same context
channel = yield self.get_channel_by_id_name(ID,name)
if not channel.has_set:
raise ValueError("Tried to set_channel on a channel that does not support set commands")
set_var_value = (value * channel.set_scale) + channel.set_offset
if set_var_value > channel.set_max:raise ValueError("value set (raw:{value}, adjusted:{set_var_value}) exceeds max value:{max}".format(value=value,set_var_value=set_var_value,max=channel.set_max))
if set_var_value < channel.set_min:raise ValueError("value set (raw:{value}, adjusted:{set_var_value}) deceeds min value:{min}".format(value=value,set_var_value=set_var_value,min=channel.set_min))
units = channel.set_var_units
set_var_value = to_type(set_var_value,units)
if len(channel.set_statics) == 0:
try: # first we try to send the setting in the channel's context.
ret = yield self.client[channel.set_setting[0]][channel.set_setting[2]](set_var_value,context=channel.context)
except: # if it fails we try selecting the device & sending the request again
yield self.client[channel.set_setting[0]].select_device(channel.set_setting[1],context=channel.context)
ret = yield self.client[channel.set_setting[0]][channel.set_setting[2]](set_var_value,context=channel.context)
# if it fails here we don't catch it, as it failed for a reason other than
# not being selected, and we want the user to see the error message.
else:
inputs = assemble_set_list(channel.set_var_slot,set_var_value,channel.set_statics)
try:
ret = yield self.client[channel.set_setting[0]][channel.set_setting[2]](inputs,context=channel.context)
except:
yield self.client[channel.set_setting[0]].select_device(channel.set_setting[1],context=channel.context)
ret = yield self.client[channel.set_setting[0]][channel.set_setting[2]](inputs,context=channel.context)
self.signal__channel_set([channel.ID,channel.name,str(ret)])
returnValue(str(ret))
@setting(1001,"get channel",ID='s',name='s',returns='v{value}')
def get_channel(self,c,ID,name=""):
"""Gets the value (input or set output) of a channel. \nChannel specified by name and/or ID"""
channel = yield self.get_channel_by_id_name(ID,name)
if not channel.has_get:
raise ValueError("Tried to get_channel on a channel that does not support get commands")
if len(channel.get_inputs) == 0: # If there are no inputs required, then
try: # we call the setting with context only (no inputs)
ret = yield self.client[channel.get_setting[0]][channel.get_setting[2]](context=channel.context)
except:
yield self.client[channel.get_setting[0]].select_device(channel.get_setting[1],context=channel.context)
ret = yield self.client[channel.get_setting[0]][channel.get_setting[2]](context=channel.context)
elif len(channel.get_inputs) == 1: # If there's exactly one input required then
try: # we call the setting with (input, context)
ret = yield self.client[channel.get_setting[0]][channel.get_setting[2]](channel.get_inputs[0],context=channel.context)
except:
yield self.client[channel.get_setting[0]].select_device(channel.get_setting[1],context=channel.context)
ret = yield self.client[channel.get_setting[0]][channel.get_setting[2]](channel.get_inputs[0],context=channel.context)
else: # multiple inputs required, then they can be given as a list
try: # so we call the setting with ([inputs...], context)
ret = yield self.client[channel.get_setting[0]][channel.get_setting[2]](channel.get_inputs,context=channel.context)
except:
ret = yield self.client[channel.get_setting[0]][channel.get_setting[2]](channel.get_inputs,context=channel.context)
yield self.client[channel.get_setting[0]].select_device(channel.get_setting[1],context=channel.context)
ret = ret._value if type(ret) is units.Value else ret
self.signal__channel_get([channel.ID,channel.name,ret])
returnValue(ret)
__server__ = VirtualDeviceServer()
if __name__ == '__main__':
from labrad import util
util.runServer(__server__)