diff --git a/server/cf.conf b/server/cf.conf index 55dffe8..59ec2bf 100644 --- a/server/cf.conf +++ b/server/cf.conf @@ -35,15 +35,28 @@ procs = cf # stored in /etc/channelfinderapi.conf as described in the client [cf] -# a space-separated list of infotags to set as CF Properties -#infotags = archive foo bar blah -# List environment variables that should be written as channel finder properties +# cf-store application +# Uncomment line below to turn on the feature to add alias records to channelfinder +# alias = on +# Uncomment line below to turn on the feature to add EPICS record type to channelfinder +# recordType = on +# Uncomment line below to turn on the feature to add description field to channelfinder +# recordDesc = on +# The size limit for finding channels (ie the value of the '~size' query parameter) +# If not specified then the fallback is the server default +# findSizeLimit = 10000 +# Mark all channels as 'Inactive' when processor is stopped (default: True) +# cleanOnStop = True +# Mark all channels as 'Inactive' when processor is started (default: True) +# cleanOnStart = True +# Specify an optional id for the recceiver to be used with cleanOnStart and cleanOnStop +# default value is the hostname of the machine the python interpreter is started on +# recceiverID = recc1 +# Debug output file location. +# Produces no file when not defined. +# debug_file_loc = /home/devuser/recsyncdata.txt # # Comma-separated list of VARIABLE:PropertyName, # specifying which environment VARIABLEs to pass on to the channel finder # and defining the corresponding PropertyName #environment_vars=ENGINEER:Engineer,EPICS_BASE:EpicsVersion,PWD:WorkingDirectory -# Turn on optional alias and recordType properties -#alias = on -#recordType = on -#recordDesc = on diff --git a/server/demo.conf b/server/demo.conf index d9c6cca..5a4ceab 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -78,9 +78,12 @@ idkey = 42 # If not specified then the fallback is the server default # findSizeLimit = 10000 # Mark all channels as 'Inactive' when processor is stopped (default: True) -#cleanOnStop = True +# cleanOnStop = True # Mark all channels as 'Inactive' when processor is started (default: True) -#cleanOnStart = True +# cleanOnStart = True +# Specify an optional id for the recceiver to be used with cleanOnStart and cleanOnStop +# default value is the hostname of the machine the python interpreter is started on +# recceiverID = recc1 # Debug output file location. # Produces no file when not defined. # debug_file_loc = /home/devuser/recsyncdata.txt diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 94b5ef1..c51aaba 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import logging +import socket _log = logging.getLogger(__name__) from zope.interface import implementer @@ -17,6 +18,8 @@ import datetime import os import json +from channelfinder import ChannelFinderClient + # ITRANSACTION FORMAT: # @@ -30,6 +33,8 @@ __all__ = ['CFProcessor'] +RECCEIVERID_KEY = 'recceiverID' +RECCEIVERID_DEFAULT = socket.gethostname() @implementer(interfaces.IProcessor) class CFProcessor(service.Service): @@ -68,11 +73,10 @@ def _startServiceWithLock(self): Using the default python cf-client. The url, username, and password are provided by the channelfinder._conf module. """ - from channelfinder import ChannelFinderClient self.client = ChannelFinderClient() try: cf_props = [prop['name'] for prop in self.client.getAllProperties()] - reqd_props = {'hostName', 'iocName', 'pvStatus', 'time', 'iocid'} + reqd_props = {'hostName', 'iocName', 'pvStatus', 'time', 'iocid', RECCEIVERID_KEY} if (self.conf.get('alias', 'default') == 'on'): reqd_props.add('alias') if (self.conf.get('recordType', 'default') == 'on'): @@ -224,7 +228,7 @@ def _commitWithThread(self, TR): pvInfoByName = {} for rid, (info) in pvInfo.items(): if info["pvName"] in pvInfoByName: - _log.warn("Commit contains multiple records with PV name: %s (%s)", pv, iocid) + _log.warn("Commit contains multiple records with PV name: %s (%s)", info["pvName"], iocid) continue pvInfoByName[info["pvName"]] = info _log.debug("Add record: %s: %s", rid, info) @@ -246,31 +250,26 @@ def _commitWithThread(self, TR): self.iocs[iocid]["channelcount"] += 1 for pv in delrec: if iocid in self.channel_dict[pv]: - self.channel_dict[pv].remove(iocid) - if iocid in self.iocs: - self.iocs[iocid]["channelcount"] -= 1 - if self.iocs[iocid]['channelcount'] == 0: - self.iocs.pop(iocid, None) - elif self.iocs[iocid]['channelcount'] < 0: - _log.error("Channel count negative: %s", iocid) - if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs - del self.channel_dict[pv] + self.remove_channel(pv, iocid) """In case, alias exists""" if (self.conf.get('alias', 'default' == 'on')): if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: for a in pvInfoByName[pv]["aliases"]: - self.channel_dict[a].remove(iocid) - if iocid in self.iocs: - self.iocs[iocid]["channelcount"] -= 1 - if self.iocs[iocid]['channelcount'] == 0: - self.iocs.pop(iocid, None) - elif self.iocs[iocid]['channelcount'] < 0: - _log.error("Channel count negative: %s", iocid) - if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs - del self.channel_dict[a] + self.remove_channel(a, iocid) poll(__updateCF__, self, pvInfoByName, delrec, hostName, iocName, iocid, owner, time) dict_to_file(self.channel_dict, self.iocs, self.conf) + def remove_channel(self, a, iocid): + self.channel_dict[a].remove(iocid) + if iocid in self.iocs: + self.iocs[iocid]["channelcount"] -= 1 + if self.iocs[iocid]['channelcount'] == 0: + self.iocs.pop(iocid, None) + elif self.iocs[iocid]['channelcount'] < 0: + _log.error("Channel count negative: %s", iocid) + if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs + del self.channel_dict[a] + def clean_service(self): """ Marks all channels as "Inactive" until the recsync server is back up @@ -278,10 +277,11 @@ def clean_service(self): sleep = 1 retry_limit = 5 owner = self.conf.get('username', 'cfstore') + recceiverid = self.conf.get(RECCEIVERID_KEY, RECCEIVERID_DEFAULT) while 1: try: _log.info("CF Clean Started") - channels = self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active')])) + channels = self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)])) if channels is not None: new_channels = [] for ch in channels or []: @@ -326,6 +326,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io channels_dict = proc.channel_dict iocs = proc.iocs conf = proc.conf + recceiverid = conf.get(RECCEIVERID_KEY, RECCEIVERID_DEFAULT) new = set(pvInfoByName.keys()) if iocid in iocs: @@ -354,12 +355,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io if len(new) == 0 or ch[u'name'] in delrec: # case: empty commit/del, remove all reference to ioc if ch[u'name'] in channels_dict: ch[u'owner'] = iocs[channels_dict[ch[u'name']][-1]]["owner"] - ch[u'properties'] = __merge_property_lists([ - {u'name': 'hostName', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["hostname"]}, - {u'name': 'iocName', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["iocname"]}, - {u'name': 'iocid', u'owner': owner, u'value': channels_dict[ch[u'name']][-1]}, - {u'name': 'pvStatus', u'owner': owner, u'value': 'Active'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}], + ch[u'properties'] = __merge_property_lists(ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch), ch[u'properties']) if (conf.get('recordType', 'default') == 'on'): ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["recordType"]}), ch[u'properties']) @@ -371,12 +367,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io for a in pvInfoByName[ch[u'name']]["aliases"]: if a[u'name'] in channels_dict: a[u'owner'] = iocs[channels_dict[a[u'name']][-1]]["owner"] - a[u'properties'] = __merge_property_lists([ - {u'name': 'hostName', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["hostname"]}, - {u'name': 'iocName', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["iocname"]}, - {u'name': 'iocid', u'owner': owner, u'value': channels_dict[a[u'name']][-1]}, - {u'name': 'pvStatus', u'owner': owner, u'value': 'Active'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}], + a[u'properties'] = __merge_property_lists(ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch), a[u'properties']) if (conf.get('recordType', 'default') == 'on'): ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["recordType"]}), ch[u'properties']) @@ -463,11 +454,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io raise defer.CancelledError() for pv in new: - newProps = [{u'name': 'hostName', u'owner': owner, u'value': hostName}, - {u'name': 'iocName', u'owner': owner, u'value': iocName}, - {u'name': 'iocid', u'owner': owner, u'value': iocid}, - {u'name': 'pvStatus', u'owner': owner, u'value': "Active"}, - {u'name': 'time', u'owner': owner, u'value': iocTime}] + newProps = create_properties(owner, iocTime, recceiverid, hostName, iocName, iocid) if (conf.get('recordType', 'default') == 'on'): newProps.append({u'name': 'recordType', u'owner': owner, u'value': pvInfoByName[pv]['recordType']}) if pv in pvInfoByName and "infoProperties" in pvInfoByName[pv]: @@ -521,6 +508,22 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io if proc.cancelled: raise defer.CancelledError() +def create_properties(owner, iocTime, recceiverid, hostName, iocName, iocid): + return [ + {u'name': 'hostName', u'owner': owner, u'value': hostName}, + {u'name': 'iocName', u'owner': owner, u'value': iocName}, + {u'name': 'iocid', u'owner': owner, u'value': iocid}, + {u'name': 'pvStatus', u'owner': owner, u'value': 'Active'}, + {u'name': 'time', u'owner': owner, u'value': iocTime}, + {u'name': RECCEIVERID_KEY, u'owner': owner, u'value': recceiverid}] + +def ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch): + return create_properties(owner, iocTime, recceiverid, + iocs[channels_dict[ch[u'name']][-1]]["hostname"], + iocs[channels_dict[ch[u'name']][-1]]["iocname"], + channels_dict[ch[u'name']][-1]) + + def __merge_property_lists(newProperties, oldProperties): """ Merges two lists of properties ensuring that there are no 2 properties with