From f1fd91d4d9602af61e2847fdbb04d0cfbe2a47fe Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Thu, 18 Jul 2024 17:14:33 +0100 Subject: [PATCH 01/10] Begun adding RulesHandler --- src/p4p/server/nthandlers.py | 148 +++++++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 src/p4p/server/nthandlers.py diff --git a/src/p4p/server/nthandlers.py b/src/p4p/server/nthandlers.py new file mode 100644 index 00000000..5442058f --- /dev/null +++ b/src/p4p/server/nthandlers.py @@ -0,0 +1,148 @@ +""" Handler for NTScalar (so far) """ + +import logging +import operator +import time + +from collections import OrderedDict +from enum import Enum +from __future__ import annotations +from typing import Callable + +from p4p import Value +from p4p.server import ServerOperation +from p4p.server.thread import Handler, SharedPV + +logger = logging.getLogger(__name__) + + +class BaseRulesHandler(Handler): + """ + Base class for rules that includes rules common to all PV types. + """ + class RulesFlow(Enum): + """What to do after a rule has been evaluated""" + + CONTINUE = 1 # Continue rules processing + TERMINATE = 2 # Do not process more rules but we're good to here + ABORT = 3 # Stop rules processing and abort put + + def __init__(self) -> None: + # TODO: Removed timestamp so the logic for this will need to replicated somewhere else + + super().__init__() + self._name = None + + self._init_rules : OrderedDict[ + Callable[[dict, Value], Value] + ] = OrderedDict() + + self._put_rules : OrderedDict[ + Callable[[SharedPV, ServerOperation], self.RulesFlow] + ] = OrderedDict() + + + def OnFirstConnect(self, pv : SharedPV) -> None: + """ + This method is called by the pvrecipe after the pv has been created + """ + #Evaluate the timestamp last + self._init_rules.move_to_end("timestamp") + + for post_init_rule_name, post_init_rule in self._init_rules.items(): + logger.debug('Processing post init rule %s', post_init_rule_name) + value = post_init_rule(pv.current().raw, pv.current().raw) + if value: + pv.post(value=value) + + def put(self, pv: SharedPV, op: ServerOperation) -> None: + """Put that applies a set of rules""" + self._name = op.name() + logger.debug("Processing attempt to change PV %s by %s (member of %s) at %s", + op.name(), op.account(), op.roles(), op.peer()) + + # oldpvstate : Value = pv.current().raw + newpvstate: Value = op.value().raw + + logger.debug( + "Processing changes to the following fields: %r (value = %s)", + newpvstate.changedSet(), + newpvstate["value"], + ) + + if not self._apply_rules(pv, op): + return + + logger.info( + "Making the following changes to %s: %r", + self._name, + newpvstate.changedSet(), + ) + pv.post(op.value()) # just store and update subscribers + + op.done() + logger.info("Processed change to PV %s by %s (member of %s) at %s", + op.name(), op.account(), op.roles(), op.peer()) + + def _apply_rules(self, pv: SharedPV, op: ServerOperation) -> bool: + """ + Apply the rules, usually when a put operation is attempted + """ + for rule_name, put_rule in self._put_rules.items(): + logger.debug('Applying rule %s', rule_name) + rule_flow = put_rule(pv, op) + + match (rule_flow): + case self.RulesFlow.CONTINUE: + pass + case self.RulesFlow.ABORT: + logger.debug("Rule %s triggered handler abort", rule_name) + op.done() + return False + case self.RulesFlow.TERMINATE: + logger.debug("Rule %s triggered handler terminate", rule_name) + break + case None: + logger.warning( + "Rule %s did not return rule flow. Defaulting to " + "CONTINUE, but this behaviour may change in " + "future.", + rule_name, + ) + case _: + logger.critical("Rule %s returned unhandled return type", rule_name) + raise TypeError( + f"Rule {rule_name} returned unhandled return type {type(rule_flow)}" + ) + + return True + + def set_read_only(self): + """ + Make this PV read only. + """ + self._put_rules["read_only"] = ( + lambda new, old: BaseRulesHandler.RulesFlow.ABORT + ) + self._put_rules.move_to_end("read_only", last=False) + + def _timestamp_rule(self, _, op: ServerOperation) -> RulesFlow: + """Handle updating the timestamps""" + + # Note that timestamps are not automatically handled so we may need to set them ourselves + newpvstate: Value = op.value().raw + self.evaluate_timestamp(_, newpvstate) + + return self.RulesFlow.CONTINUE + + def evaluate_timestamp(self, _ : dict, newpvstate : Value) -> Value: + """ Update the timeStamp of a PV """ + if newpvstate.changed("timeStamp"): + logger.debug("Using timeStamp from put operation") + else: + logger.debug("Generating timeStamp from time.time()") + sec, nsec = time_in_seconds_and_nanoseconds(time.time()) + newpvstate["timeStamp.secondsPastEpoch"] = sec + newpvstate["timeStamp.nanoseconds"] = nsec + + return newpvstate From 4ccfb7c41ad7c7a2b5a7d603449d563ef4bfd275 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Sun, 4 Aug 2024 19:32:17 +0100 Subject: [PATCH 02/10] Implementation and example for nthandlers --- example/ntscalar_walarm.py | 19 ++ src/p4p/server/nthandlers.py | 461 +++++++++++++++++++++++++++++++---- 2 files changed, 430 insertions(+), 50 deletions(-) create mode 100644 example/ntscalar_walarm.py diff --git a/example/ntscalar_walarm.py b/example/ntscalar_walarm.py new file mode 100644 index 00000000..fef921c0 --- /dev/null +++ b/example/ntscalar_walarm.py @@ -0,0 +1,19 @@ +import logging +logger = logging.getLogger(__name__) + +from p4p.nt import NTScalar +from p4p.server import Server +from p4p.server.thread import SharedPV +from p4p.server.nthandlers import NTScalarRulesHandler + +# Construct PV with control and valueAlarm structures +# and then set the values of some of those values with a post +pv = SharedPV(nt=NTScalar('d', control=True, valueAlarm=True), + handler=NTScalarRulesHandler(), + initial=12.0) +pv.post({'control.limitHigh': 6, + 'valueAlarm.active': True, 'valueAlarm.lowAlarmLimit': 1, 'valueAlarm.lowAlarmSeverity':2}) + +Server.forever(providers=[{ + 'demo:pv:name':pv, # PV name only appears here +}]) diff --git a/src/p4p/server/nthandlers.py b/src/p4p/server/nthandlers.py index 5442058f..5c4c6f63 100644 --- a/src/p4p/server/nthandlers.py +++ b/src/p4p/server/nthandlers.py @@ -1,57 +1,166 @@ """ Handler for NTScalar (so far) """ +from __future__ import annotations import logging import operator import time from collections import OrderedDict -from enum import Enum -from __future__ import annotations +from enum import Enum, auto +from typing import SupportsFloat as Numeric # Hack to type hint number types from typing import Callable -from p4p import Value -from p4p.server import ServerOperation -from p4p.server.thread import Handler, SharedPV +from ..wrapper import Value +from . import ServerOperation +from .raw import Handler, SharedPV + + +_log = logging.getLogger(__name__) + logger = logging.getLogger(__name__) +class RulesFlow(Enum): + """ + Used by the BaseRulesHandler to control whether to continue or stop + evaluation of rules in the defined sequence. It may also be used to + set an error message if rule evaluation is aborted. + """ + + CONTINUE = auto() # Continue rules processing + TERMINATE = auto() # Do not process more rules but apply timestamp and complete + TERMINATE_WO_TIMESTAMP = auto() # Do not process further rules; do not apply timestamp rule + ABORT = auto() # Stop rules processing and abort put + + def __init__(self, _) -> None: + # We include an error string so that we can indicate why an ABORT + # has been triggered + self.error : str = None + + def set_errormsg(self, errormsg : str) -> RulesFlow: + """ + Set an error message to explain an ABORT + This function returns the class instance so it may be used in lambdas + """ + self.error = errormsg + + return self class BaseRulesHandler(Handler): """ - Base class for rules that includes rules common to all PV types. + Base class for use as a handler which appplies named rules in a defined order. """ - class RulesFlow(Enum): - """What to do after a rule has been evaluated""" - CONTINUE = 1 # Continue rules processing - TERMINATE = 2 # Do not process more rules but we're good to here - ABORT = 3 # Stop rules processing and abort put + # The BaseRulesHandler uses two OrderedDicts of functions to evaluate + # handler rules. The BaseRulesHandler only includes built-in rules to + # handle read only PVs and to apply timeStamps. Subclasses implement + # rules to handle the standard fields of Normative Types. This base + # class is already able to handle modifications of its existing rules + # and addition of custom user generated rules (though it is expected + # that users will anyway use the provided derived classes in most cases). + # + # There are two types of rule applied by the BaseRulesHandler, but these + # two kinds of rules are usually interconnected as we'll see. + # + ### MERGING + # But first we need to address an issue which complicates handler logic. + # Imagine a PV which has a value and a control field: + # 'example:pv': { + # 'value': 3, + # 'control': { + # 'limitLow' : -1, + # 'limitHigh' : 10, + # 'minStep' : 1 + # } + # } + # + # In the case of a put of '{value: 11}' the value will become 10. + # In the case of a put of '{limitHigh: 5}' the value will become 5 and the + # limitHigh will become 5. Note that in this case the value will change + # even though the put operation did not directly change it. + # In the case of a put of '{value: -3, limitLow: -5}' the value will + # become -3 and the limitLow will become -5. But the *order of evaluation + # now matters*. We need to evaluate the value against the new limitLow and + # not the old one. + # + # In general we must merge the old and new state of a field such as control + # during a put and only then can the handler correctly evaluate the results. + # + ### RULES + # As noted the BaseRulesHandler maintains two different OrderedDict queues + # of named rules (i.e. functions). Note that the call signatures of these + # two types of rules are different. + # + ## 1. Init Rules + # The first kind of rule is an init rule which is directly called when the + # handler's onFirstCall is called, i.e. the first time the state of the PV + # must be resolved, usually prompted by a first get/put/monitor. The PV has + # only its initial state and no prior state, nor does it have a ServerOperation + # describing an operation in progress. + # + # The init rules have a function signature of + # evaluate_init_rule(combinedvals, pvstate : Value) -> None | Value: + # where + # - combinedvals is a merger of the previous state of the PV and the + # new state of the PV for the relevant field(s). For evaluation during an + # init this is not required. + # - pvstate is the p4p.Value to be evaluated + # + # The init rule should return None if no action is to be taken, and a + # p4p.Value which will be p4p.post() if a change in the state of the PV + # is required. + # + # 2. Put Rules + # Put rules are those rules evaluated when a put operation is performed. + # + # The put rules have a function signature of + # evaluate_put_rule(pv, op) -> RulesFlow + # - pv is a p4p.Value which encapsulates the current state of the pv + # - op is a p4p.ServerOperation which describes the changes requested by + # the post + # + # A return value of RulesFlow allows control of the rules, including + # ABORTing if the put performs an illegal operation. + # + ### SPECIAL RULES + # The two rules automatically included by BaseRulesHandler are given special + # treatment during rules evaluation. The rule 'read_only' is always evaluated + # first. The rule 'timestamp' is always evaluated last. These are included + # as otherwise ordinary rules so that they may be replaced if desired. + # def __init__(self) -> None: - # TODO: Removed timestamp so the logic for this will need to replicated somewhere else - super().__init__() - self._name = None + + # Name is used purely for logging. Because the name of the PV is stored by + # the Server and not the PV object associated with this handler we can't + # determine the name until the first put operation + self._name = None # Used purely for logging self._init_rules : OrderedDict[ Callable[[dict, Value], Value] ] = OrderedDict() + self._init_rules["timestamp"] = self.evaluate_timestamp + self._put_rules : OrderedDict[ - Callable[[SharedPV, ServerOperation], self.RulesFlow] + Callable[[SharedPV, ServerOperation], RulesFlow] ] = OrderedDict() + self._put_rules["timestamp"] = self._timestamp_rule - def OnFirstConnect(self, pv : SharedPV) -> None: + def onFirstConnect(self, pv : SharedPV) -> None: # pylint: disable=invalid-name """ - This method is called by the pvrecipe after the pv has been created + This method is called when the PV is first accessed. It applies the init_rules """ - #Evaluate the timestamp last + # Evaluate the timestamp last self._init_rules.move_to_end("timestamp") - for post_init_rule_name, post_init_rule in self._init_rules.items(): - logger.debug('Processing post init rule %s', post_init_rule_name) - value = post_init_rule(pv.current().raw, pv.current().raw) + # TODO: Why is this different to _apply_rules? It doesn't have the same + # RulesFlow logic and it posts step by step rather than once at the end + for init_rule_name, init_rule in self._init_rules.items(): + logger.debug('Processing post init rule %s', init_rule_name) + value = init_rule(pv.current().raw, pv.current().raw) if value: pv.post(value=value) @@ -73,7 +182,7 @@ def put(self, pv: SharedPV, op: ServerOperation) -> None: if not self._apply_rules(pv, op): return - logger.info( + logger.debug( "Making the following changes to %s: %r", self._name, newpvstate.changedSet(), @@ -81,7 +190,7 @@ def put(self, pv: SharedPV, op: ServerOperation) -> None: pv.post(op.value()) # just store and update subscribers op.done() - logger.info("Processed change to PV %s by %s (member of %s) at %s", + logger.debug("Processed change to PV %s by %s (member of %s) at %s", op.name(), op.account(), op.roles(), op.peer()) def _apply_rules(self, pv: SharedPV, op: ServerOperation) -> bool: @@ -92,39 +201,55 @@ def _apply_rules(self, pv: SharedPV, op: ServerOperation) -> bool: logger.debug('Applying rule %s', rule_name) rule_flow = put_rule(pv, op) - match (rule_flow): - case self.RulesFlow.CONTINUE: - pass - case self.RulesFlow.ABORT: - logger.debug("Rule %s triggered handler abort", rule_name) - op.done() - return False - case self.RulesFlow.TERMINATE: - logger.debug("Rule %s triggered handler terminate", rule_name) - break - case None: - logger.warning( + # Originally a more elegant match (rule_flow): but we need + # to support versions of Python prior to 3.10 + if not rule_flow: + logger.warning( "Rule %s did not return rule flow. Defaulting to " "CONTINUE, but this behaviour may change in " "future.", rule_name, - ) - case _: - logger.critical("Rule %s returned unhandled return type", rule_name) - raise TypeError( - f"Rule {rule_name} returned unhandled return type {type(rule_flow)}" - ) + ) + elif rule_flow == RulesFlow.CONTINUE: + pass + elif rule_flow == RulesFlow.ABORT: + logger.debug("Rule %s triggered handler abort", rule_name) + + errormsg = None + if rule_flow.error: + errormsg = rule_flow.error + + op.done(error=errormsg) + return False + elif rule_flow == RulesFlow.TERMINATE: + logger.debug("Rule %s triggered handler terminate", rule_name) + self._put_rules['timestamp'](pv, op) + break + elif rule_flow == RulesFlow.TERMINATE_WO_TIMESTAMP: + logger.debug("Rule %s triggered handler terminate without timestamp", rule_name) + break + else: + logger.critical("Rule %s returned unhandled return type", rule_name) + raise TypeError( + f"Rule {rule_name} returned unhandled return type {type(rule_flow)}" + ) return True - def set_read_only(self): + def set_read_only(self, read_only: bool = True): """ Make this PV read only. + If read_only == False then the PV is made writable """ - self._put_rules["read_only"] = ( - lambda new, old: BaseRulesHandler.RulesFlow.ABORT - ) - self._put_rules.move_to_end("read_only", last=False) + if read_only: + # Switch on the read-only rule and make sure it's the first rule + self._put_rules["read_only"] = ( + lambda new, old: RulesFlow(RulesFlow.ABORT).set_errormsg("read only PV") + ) + self._put_rules.move_to_end("read_only", last=False) + else: + # Switch off the read-only rule by deleting it + self._put_rules.pop("read_only", None) def _timestamp_rule(self, _, op: ServerOperation) -> RulesFlow: """Handle updating the timestamps""" @@ -133,7 +258,7 @@ def _timestamp_rule(self, _, op: ServerOperation) -> RulesFlow: newpvstate: Value = op.value().raw self.evaluate_timestamp(_, newpvstate) - return self.RulesFlow.CONTINUE + return RulesFlow.CONTINUE def evaluate_timestamp(self, _ : dict, newpvstate : Value) -> Value: """ Update the timeStamp of a PV """ @@ -141,8 +266,244 @@ def evaluate_timestamp(self, _ : dict, newpvstate : Value) -> Value: logger.debug("Using timeStamp from put operation") else: logger.debug("Generating timeStamp from time.time()") - sec, nsec = time_in_seconds_and_nanoseconds(time.time()) - newpvstate["timeStamp.secondsPastEpoch"] = sec - newpvstate["timeStamp.nanoseconds"] = nsec + + timenow = time.time() + seconds = int(timenow // 1) + nanoseconds = int((timenow % 1) * 1e9) + + newpvstate["timeStamp.secondsPastEpoch"] = seconds + newpvstate["timeStamp.nanoseconds"] = nanoseconds return newpvstate + + def _combined_pvstates( + self, oldpvstate: Value, newpvstate: Value, interests: str | list[str] + ) -> dict: + """ + Combine the current state of the PV and that in progress from a + ServerOperation, extracting the specified list of interests (e.g. + "control", "valueAlarm", etc.). Note that value is always included. + The merger prioritises infromation from the ServerOperation, using + the current state of the PV to fill in any information not in the + ServerOperation. + A dictionary with the merged interests is returned. + """ + + # This is complicated! We may need to process alarms based on either + # the oldstate or the newstate of the PV. Suppose, for example, the + # valueAlarm limits have all been set in the PV but it is not yet active. + # Now a value change and valueAlarms.active=True comes in. We have to + # act on the new value of the PV (and its active state) but using the + # old values for the limits! + # NOTE: We can get away without deepcopies because we never change any + # of these values + # TODO: What if valueAlarm has been added or removed? + + def extract_combined_value(newpvstate, oldpvstate, key): + """Check a key. If it isn't marked as changed return the old PV state value, + and if it is return the new PV state value + """ + if newpvstate.changed(key): + return newpvstate[key] + + return oldpvstate[key] + + combinedvals = {} + combinedvals["value"] = extract_combined_value(newpvstate, oldpvstate, "value") + + if isinstance(interests, str): + interests = [interests] + + for interest in interests: + combinedvals[interest] = extract_combined_value( + newpvstate, oldpvstate, interest + ) + for key in newpvstate[interest]: + fullkey = f"{interest}.{key}" + combinedvals[fullkey] = extract_combined_value( + newpvstate, oldpvstate, fullkey + ) + + return combinedvals + +class NTScalarRulesHandler(BaseRulesHandler): + """ + Rules handler for NTScalar PVs. + """ + def __init__(self) -> None: + super().__init__() + + self._init_rules.update({ + 'control' : self.evaluate_control_limits, + 'alarm_limit' : self.evaluate_alarm_limits + }) + + self._put_rules["control"] = self._controls_rule + self._put_rules["alarm_limit"] = self._alarm_limit_rule + self._put_rules.move_to_end("timestamp") + + def _controls_rule(self, pv: SharedPV, op: ServerOperation) -> RulesFlow: + """Check whether control limits should trigger and restrict values appropriately""" + logger.debug("Evaluating control limits") + + oldpvstate: Value = pv.current().raw + newpvstate: Value = op.value().raw + + # Check if there are any controls! + if "control" not in newpvstate and "control" not in oldpvstate: + logger.debug("control not present in structure") + return RulesFlow.CONTINUE + + combinedvals = self._combined_pvstates(oldpvstate, newpvstate, "control") + + # Check minimum step first + if ( + abs(newpvstate["value"] - oldpvstate["value"]) + < combinedvals["control.minStep"] + ): + logger.debug(" None | int | Numeric: + """ Check whether a value should be clipped by the control limits """ + + if not 'control' in combinedvals: + # logger.debug("control not present in structure") + return None + + # A philosophical question! What should we do when lowLimit = highLimit = 0? + # This almost certainly means the structure hasn't been initialised, but it could + # be an attempt (for some reason) to lock the value to 0. For now we treat this + # as uninitialised and ignore limits in this case. Users will have to handle + # keeping the PV constant at 0 themselves + if ( + combinedvals["control.limitLow"] == 0 + and combinedvals["control.limitHigh"] == 0 + ): + # logger.info( + # "control.limitLow and control.LimitHigh set to 0, so ignoring control limits" + # ) + return None + + # Check lower and upper control limits + if combinedvals["value"] < combinedvals["control.limitLow"]: + value = combinedvals["control.limitLow"] + # logger.debug("Lower control limit exceeded") + return value + + if combinedvals["value"] > combinedvals["control.limitHigh"]: + value = combinedvals["control.limitHigh"] + # logger.debug("Upper control limit exceeded") + return value + + return None + + def __alarm_state_check( + self, combinedvals: dict, newpvstate: Value, alarm_type: str, op=None + ) -> bool: + """Check whether the PV should be in an alarm state """ + if not op: + if alarm_type.startswith("low"): + op = operator.le + elif alarm_type.startswith("high"): + op = operator.ge + else: + raise SyntaxError( + f"CheckAlarms/alarmStateCheck: do not know how to handle {alarm_type}" + ) + + severity = combinedvals[f"valueAlarm.{alarm_type}Severity"] + if ( + op(combinedvals["value"], combinedvals[f"valueAlarm.{alarm_type}Limit"]) + and severity + ): + newpvstate["alarm.severity"] = severity + if not newpvstate.changed("alarm.message"): + newpvstate["alarm.message"] = alarm_type + + logger.debug( + "Setting %s to severity %i",# with message '%s'", + self._name, severity#, newpvstate['alarm.message'] + ) + + return True + + return False + + def _alarm_limit_rule(self, pv: SharedPV, op: ServerOperation) -> RulesFlow: + """ Evaluate alarm limits to see if we should change severity or message""" + oldpvstate: Value = pv.current().raw + newpvstate: Value = op.value().raw + + # Check if there are alarms are present in the structure! + if "alarm" not in newpvstate and "alarm" not in oldpvstate: + logger.debug("alarm not present in structure") + return RulesFlow.CONTINUE + + # Check if valueAlarms are present + if "valueAlarm" not in newpvstate and "valueAlarm" not in oldpvstate: + logger.debug("valueAlarm not present in structure") + return RulesFlow.CONTINUE + + combinedvals = self._combined_pvstates(oldpvstate, newpvstate, ["valueAlarm", "alarm"]) + + self.evaluate_alarm_limits(combinedvals, newpvstate) + return RulesFlow.CONTINUE + + def evaluate_alarm_limits(self, combinedvals, pvstate : Value) -> None | Value: + """ Evaluate alarm value limits """ + # TODO: Apply the rule for hysteresis. Unfortunately I don't understand the + # explanation in the Normative Types specification... + + if 'valueAlarm' not in combinedvals: + logger.debug("valueAlarm not present in structure") + return None + + # Check if valueAlarms are present and active! + if not combinedvals["valueAlarm.active"]: + logger.debug("valueAlarm not active") + return None + + logger.debug("Processing valueAlarm for %s", self._name) + + try: + # The order of these tests is defined in the Normative Types document + if self.__alarm_state_check(combinedvals, pvstate, "highAlarm"): + return pvstate + if self.__alarm_state_check(combinedvals, pvstate, "lowAlarm"): + return pvstate + if self.__alarm_state_check(combinedvals, pvstate, "highWarning"): + return pvstate + if self.__alarm_state_check(combinedvals, pvstate, "lowWarning"): + return pvstate + except SyntaxError: + # TODO: Need more specific error than SyntaxError + return None + + # If we made it here then there are no alarms or warnings and we need to indicate that + # possibly by resetting any existing ones + #combinedvals = self._combined_pvstates(oldpvstate, newpvstate, "alarm") + alarms_changed = False + if combinedvals["alarm.severity"]: + pvstate["alarm.severity"] = 0 + alarms_changed = True + if combinedvals["alarm.message"]: + pvstate["alarm.message"] = "" + alarms_changed = True + + if alarms_changed: + logger.debug( + "Setting %s to severity %i with message '%s'", + self._name, pvstate['alarm.severity'], pvstate['alarm.message'] + ) + return pvstate + + logger.debug("Made no automatic changes to alarm state of %s", self._name) + return None From 406ede62b198f80f2305f9e523e572c63d1f792e Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Sat, 24 Aug 2024 10:11:45 +0100 Subject: [PATCH 03/10] Add new handler functions with examples and tests --- example/auditor.py | 75 +++++ example/ntscalar_control.py | 141 ++++++++++ src/p4p/server/nthandlers.py | 509 ---------------------------------- src/p4p/server/raw.py | 126 ++++++++- src/p4p/test/test_sharedpv.py | 141 ++++++++++ 5 files changed, 479 insertions(+), 513 deletions(-) create mode 100644 example/auditor.py create mode 100644 example/ntscalar_control.py delete mode 100644 src/p4p/server/nthandlers.py diff --git a/example/auditor.py b/example/auditor.py new file mode 100644 index 00000000..db9ed515 --- /dev/null +++ b/example/auditor.py @@ -0,0 +1,75 @@ +""" +In this example we setup a simple auditing mechanism that reports information +about the last channel changed (which channel, when, and who by). Since we +might need to know this information even when the program is not running we +persist this data to file, including information about when changes could +have been made. +""" + +import time + +from p4p.nt.scalar import NTScalar +from p4p.server import Server +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class auditor(Handler): + """Persist information to file so we can audit when the program is closed""" + + def open(self, value): + with open("audit.log", mode="a+") as f: + f.write(f"Auditing opened at {time.ctime()}\n") + + def close(self, pv): + with open("audit.log", mode="a+") as f: + value = pv.current().raw["value"] + if value: + f.write(f"Auditing closed at {time.ctime()}; {value}\n") + else: + f.write(f"Auditing closed at {time.ctime()}; no changes made\n") + + +class audited(Handler): + """Forward information about Put operations to the auditing PV""" + + def __init__(self, pv: SharedPV): + self._audit_pv = pv + + def put(self, pv, op): + pv.post(op.value()) + self._audit_pv.post( + f"Channel {op.name()} last updated by {op.account()} at {time.ctime()}" + ) + op.done() + + +# Setup the PV that will make the audit information available. +# Note that there is no put in its handler so it will be externally read-only +auditor_pv = SharedPV(nt=NTScalar("s"), handler=auditor(), initial="") + +# Setup some PVs that will be audited and one that won't be +# Note that the audited handler does have a put so these PVs can be changed externally +pvs = { + "demo:pv:auditor": auditor_pv, + "demo:pv:audited_d": SharedPV( + nt=NTScalar("d"), handler=audited(auditor_pv), initial=9.99 + ), + "demo:pv:audited_i": SharedPV( + nt=NTScalar("i"), handler=audited(auditor_pv), initial=4 + ), + "demo:pv:audited_s": SharedPV( + nt=NTScalar("s"), handler=audited(auditor_pv), initial="Testing" + ), + "demo:pv:unaudted_i": SharedPV(nt=NTScalar("i"), initial=-1), +} +print(pvs.keys()) + +server = None +try: + Server.forever(providers=[pvs]) +except KeyboardInterrupt: + pass +finally: + # We need to close the auditor PV manually, the server stop() won't do it for us + auditor_pv.close() diff --git a/example/ntscalar_control.py b/example/ntscalar_control.py new file mode 100644 index 00000000..34c093fa --- /dev/null +++ b/example/ntscalar_control.py @@ -0,0 +1,141 @@ +from p4p.nt import NTScalar +from p4p.server import Server +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV +from p4p.wrapper import Type, Value + + +class SimpleControl(Handler): + """ + A simple handler that implements the logic for the Control field of a + Normative Type. + """ + + def __init__(self, min_value=None, max_value=None, min_step=None): + # The attentive reader may wonder why we are keeping track of state here + # instead of relying on control.limitLow, control.limitHigh, and + # control.minStep. There are three possible reasons a developer might + # choose an implementation like this: + # - As [Ref1] shows it's not straightforward to maintain state using + # the PV's own fields + # - A developer may wish to have the limits apply as soon as the + # Channel is open. If an initial value is set then this may happen + # before the first post(). + # - It is possible to adapt this handler so it could be used without + # a Control field. + # The disadvantage of this simple approach is that clients cannot + # inspect the Control field values until they have been changed. + self._min_value = min_value # Minimum value allowed + self._max_value = max_value # Maximum value allowed + self._min_step = min_step # Minimum change allowed + + def open(self, value): + """ + This function manages all logic when we only need to consider the + (proposed) future state of a PV + """ + # Check if the limitHigh has changed. If it has then we have to reevaluate + # the existing value. Note that for this to work with a post request we + # have to take the actions explained at Ref1 + if value.changed("control.limitHigh"): + self._max_value = value["control.limitHigh"] + if value["value"] > self._max_value: + value["value"] = self._max_value + + if value.changed("control.limitLow"): + self._min_value = value["control.limitLow"] + if value["value"] < self._min_value: + value["value"] = self._min_value + + # If the value has changed we need to check it against the limits and + # change it if any of the limits apply + if value.changed("value"): + if self._max_value and value["value"] > self._max_value: + value["value"] = self._max_value + if self._min_value and value["value"] < self._min_value: + value["value"] = self._min_value + + def post(self, pv, value): + """ + This function manages all logic when we need to know both the + current and (proposed) future state of a PV + """ + # If the minStep has changed update this instance's minStemp value + if value.changed("control.minStep"): + self._min_change = value["control.minStep"] + + # [Ref1] This is where even our simple handler gets complex! + # If the value["value"] has not been changed as part of the post() + # operation then it will be set to a default value (i.e. 0) and + # marked unchanged. + current_value = pv.current().raw + value_changed = True # TODO: Explain this + if not value.changed("value"): + value["value"] = current_value["value"] + value.mark("value", False) + value_changed = False + + # Apply the control limits before the check for minimum change as the + # value may be altered by the limits. + self.open(value) + if not value_changed and value.changed("value"): + return + + if abs(current_value["value"] - value["value"]) < self._min_step: + value.mark("value", False) + + def put(self, pv, op): + """ + In most cases the combination of a put() and post() means that the + put() is solely concerned with issues of authorisation. + """ + # Demo authorisation. + # Only Alice may remotely change the Control limits + # Bob is forbidden from changing anything on this Channel + # Everyone else may change the value but not the Control limits + errmsg = None + if op.account() == "Alice": + pass + elif op.account() == "Bob": + op.done(error="Bob is forbidden to make changes!") + return + else: + if op.value().raw.changed("control"): + errmsg = f"Unauthorised attempt to set Control by {op.account()}" + op.value().raw.mark("control", False) + + # Because we have not set use_handler_post=False in the post this + # will automatically trigger evaluation of the post rule and thus + # the application of + pv.post(op.value()) + op.done(error=errmsg) + + +# Construct PV with control and structures +# and then set the values of some of those values with a post +pv = SharedPV( + nt=NTScalar("d", control=True), + handler=SimpleControl(-1, 11, 2), + initial=12.0, # Immediately limited to 11 due to handler on live above +) +pv.post( + { + "control.limitHigh": 6, # Value now limited to 6 + } +) + + +@pv.put +def handle(pv, op): + pv.post(op.value()) # just store and update subscribers + op.done() + + +print("demo:pv:name: ", pv) +Server.forever( + providers=[ + { + "demo:pv:name": pv, + } + ] +) diff --git a/src/p4p/server/nthandlers.py b/src/p4p/server/nthandlers.py deleted file mode 100644 index 5c4c6f63..00000000 --- a/src/p4p/server/nthandlers.py +++ /dev/null @@ -1,509 +0,0 @@ -""" Handler for NTScalar (so far) """ -from __future__ import annotations - -import logging -import operator -import time - -from collections import OrderedDict -from enum import Enum, auto -from typing import SupportsFloat as Numeric # Hack to type hint number types -from typing import Callable - -from ..wrapper import Value -from . import ServerOperation -from .raw import Handler, SharedPV - - -_log = logging.getLogger(__name__) - - -logger = logging.getLogger(__name__) - -class RulesFlow(Enum): - """ - Used by the BaseRulesHandler to control whether to continue or stop - evaluation of rules in the defined sequence. It may also be used to - set an error message if rule evaluation is aborted. - """ - - CONTINUE = auto() # Continue rules processing - TERMINATE = auto() # Do not process more rules but apply timestamp and complete - TERMINATE_WO_TIMESTAMP = auto() # Do not process further rules; do not apply timestamp rule - ABORT = auto() # Stop rules processing and abort put - - def __init__(self, _) -> None: - # We include an error string so that we can indicate why an ABORT - # has been triggered - self.error : str = None - - def set_errormsg(self, errormsg : str) -> RulesFlow: - """ - Set an error message to explain an ABORT - This function returns the class instance so it may be used in lambdas - """ - self.error = errormsg - - return self - -class BaseRulesHandler(Handler): - """ - Base class for use as a handler which appplies named rules in a defined order. - """ - - # The BaseRulesHandler uses two OrderedDicts of functions to evaluate - # handler rules. The BaseRulesHandler only includes built-in rules to - # handle read only PVs and to apply timeStamps. Subclasses implement - # rules to handle the standard fields of Normative Types. This base - # class is already able to handle modifications of its existing rules - # and addition of custom user generated rules (though it is expected - # that users will anyway use the provided derived classes in most cases). - # - # There are two types of rule applied by the BaseRulesHandler, but these - # two kinds of rules are usually interconnected as we'll see. - # - ### MERGING - # But first we need to address an issue which complicates handler logic. - # Imagine a PV which has a value and a control field: - # 'example:pv': { - # 'value': 3, - # 'control': { - # 'limitLow' : -1, - # 'limitHigh' : 10, - # 'minStep' : 1 - # } - # } - # - # In the case of a put of '{value: 11}' the value will become 10. - # In the case of a put of '{limitHigh: 5}' the value will become 5 and the - # limitHigh will become 5. Note that in this case the value will change - # even though the put operation did not directly change it. - # In the case of a put of '{value: -3, limitLow: -5}' the value will - # become -3 and the limitLow will become -5. But the *order of evaluation - # now matters*. We need to evaluate the value against the new limitLow and - # not the old one. - # - # In general we must merge the old and new state of a field such as control - # during a put and only then can the handler correctly evaluate the results. - # - ### RULES - # As noted the BaseRulesHandler maintains two different OrderedDict queues - # of named rules (i.e. functions). Note that the call signatures of these - # two types of rules are different. - # - ## 1. Init Rules - # The first kind of rule is an init rule which is directly called when the - # handler's onFirstCall is called, i.e. the first time the state of the PV - # must be resolved, usually prompted by a first get/put/monitor. The PV has - # only its initial state and no prior state, nor does it have a ServerOperation - # describing an operation in progress. - # - # The init rules have a function signature of - # evaluate_init_rule(combinedvals, pvstate : Value) -> None | Value: - # where - # - combinedvals is a merger of the previous state of the PV and the - # new state of the PV for the relevant field(s). For evaluation during an - # init this is not required. - # - pvstate is the p4p.Value to be evaluated - # - # The init rule should return None if no action is to be taken, and a - # p4p.Value which will be p4p.post() if a change in the state of the PV - # is required. - # - # 2. Put Rules - # Put rules are those rules evaluated when a put operation is performed. - # - # The put rules have a function signature of - # evaluate_put_rule(pv, op) -> RulesFlow - # - pv is a p4p.Value which encapsulates the current state of the pv - # - op is a p4p.ServerOperation which describes the changes requested by - # the post - # - # A return value of RulesFlow allows control of the rules, including - # ABORTing if the put performs an illegal operation. - # - ### SPECIAL RULES - # The two rules automatically included by BaseRulesHandler are given special - # treatment during rules evaluation. The rule 'read_only' is always evaluated - # first. The rule 'timestamp' is always evaluated last. These are included - # as otherwise ordinary rules so that they may be replaced if desired. - # - - def __init__(self) -> None: - super().__init__() - - # Name is used purely for logging. Because the name of the PV is stored by - # the Server and not the PV object associated with this handler we can't - # determine the name until the first put operation - self._name = None # Used purely for logging - - self._init_rules : OrderedDict[ - Callable[[dict, Value], Value] - ] = OrderedDict() - - self._init_rules["timestamp"] = self.evaluate_timestamp - - self._put_rules : OrderedDict[ - Callable[[SharedPV, ServerOperation], RulesFlow] - ] = OrderedDict() - - self._put_rules["timestamp"] = self._timestamp_rule - - def onFirstConnect(self, pv : SharedPV) -> None: # pylint: disable=invalid-name - """ - This method is called when the PV is first accessed. It applies the init_rules - """ - # Evaluate the timestamp last - self._init_rules.move_to_end("timestamp") - - # TODO: Why is this different to _apply_rules? It doesn't have the same - # RulesFlow logic and it posts step by step rather than once at the end - for init_rule_name, init_rule in self._init_rules.items(): - logger.debug('Processing post init rule %s', init_rule_name) - value = init_rule(pv.current().raw, pv.current().raw) - if value: - pv.post(value=value) - - def put(self, pv: SharedPV, op: ServerOperation) -> None: - """Put that applies a set of rules""" - self._name = op.name() - logger.debug("Processing attempt to change PV %s by %s (member of %s) at %s", - op.name(), op.account(), op.roles(), op.peer()) - - # oldpvstate : Value = pv.current().raw - newpvstate: Value = op.value().raw - - logger.debug( - "Processing changes to the following fields: %r (value = %s)", - newpvstate.changedSet(), - newpvstate["value"], - ) - - if not self._apply_rules(pv, op): - return - - logger.debug( - "Making the following changes to %s: %r", - self._name, - newpvstate.changedSet(), - ) - pv.post(op.value()) # just store and update subscribers - - op.done() - logger.debug("Processed change to PV %s by %s (member of %s) at %s", - op.name(), op.account(), op.roles(), op.peer()) - - def _apply_rules(self, pv: SharedPV, op: ServerOperation) -> bool: - """ - Apply the rules, usually when a put operation is attempted - """ - for rule_name, put_rule in self._put_rules.items(): - logger.debug('Applying rule %s', rule_name) - rule_flow = put_rule(pv, op) - - # Originally a more elegant match (rule_flow): but we need - # to support versions of Python prior to 3.10 - if not rule_flow: - logger.warning( - "Rule %s did not return rule flow. Defaulting to " - "CONTINUE, but this behaviour may change in " - "future.", - rule_name, - ) - elif rule_flow == RulesFlow.CONTINUE: - pass - elif rule_flow == RulesFlow.ABORT: - logger.debug("Rule %s triggered handler abort", rule_name) - - errormsg = None - if rule_flow.error: - errormsg = rule_flow.error - - op.done(error=errormsg) - return False - elif rule_flow == RulesFlow.TERMINATE: - logger.debug("Rule %s triggered handler terminate", rule_name) - self._put_rules['timestamp'](pv, op) - break - elif rule_flow == RulesFlow.TERMINATE_WO_TIMESTAMP: - logger.debug("Rule %s triggered handler terminate without timestamp", rule_name) - break - else: - logger.critical("Rule %s returned unhandled return type", rule_name) - raise TypeError( - f"Rule {rule_name} returned unhandled return type {type(rule_flow)}" - ) - - return True - - def set_read_only(self, read_only: bool = True): - """ - Make this PV read only. - If read_only == False then the PV is made writable - """ - if read_only: - # Switch on the read-only rule and make sure it's the first rule - self._put_rules["read_only"] = ( - lambda new, old: RulesFlow(RulesFlow.ABORT).set_errormsg("read only PV") - ) - self._put_rules.move_to_end("read_only", last=False) - else: - # Switch off the read-only rule by deleting it - self._put_rules.pop("read_only", None) - - def _timestamp_rule(self, _, op: ServerOperation) -> RulesFlow: - """Handle updating the timestamps""" - - # Note that timestamps are not automatically handled so we may need to set them ourselves - newpvstate: Value = op.value().raw - self.evaluate_timestamp(_, newpvstate) - - return RulesFlow.CONTINUE - - def evaluate_timestamp(self, _ : dict, newpvstate : Value) -> Value: - """ Update the timeStamp of a PV """ - if newpvstate.changed("timeStamp"): - logger.debug("Using timeStamp from put operation") - else: - logger.debug("Generating timeStamp from time.time()") - - timenow = time.time() - seconds = int(timenow // 1) - nanoseconds = int((timenow % 1) * 1e9) - - newpvstate["timeStamp.secondsPastEpoch"] = seconds - newpvstate["timeStamp.nanoseconds"] = nanoseconds - - return newpvstate - - def _combined_pvstates( - self, oldpvstate: Value, newpvstate: Value, interests: str | list[str] - ) -> dict: - """ - Combine the current state of the PV and that in progress from a - ServerOperation, extracting the specified list of interests (e.g. - "control", "valueAlarm", etc.). Note that value is always included. - The merger prioritises infromation from the ServerOperation, using - the current state of the PV to fill in any information not in the - ServerOperation. - A dictionary with the merged interests is returned. - """ - - # This is complicated! We may need to process alarms based on either - # the oldstate or the newstate of the PV. Suppose, for example, the - # valueAlarm limits have all been set in the PV but it is not yet active. - # Now a value change and valueAlarms.active=True comes in. We have to - # act on the new value of the PV (and its active state) but using the - # old values for the limits! - # NOTE: We can get away without deepcopies because we never change any - # of these values - # TODO: What if valueAlarm has been added or removed? - - def extract_combined_value(newpvstate, oldpvstate, key): - """Check a key. If it isn't marked as changed return the old PV state value, - and if it is return the new PV state value - """ - if newpvstate.changed(key): - return newpvstate[key] - - return oldpvstate[key] - - combinedvals = {} - combinedvals["value"] = extract_combined_value(newpvstate, oldpvstate, "value") - - if isinstance(interests, str): - interests = [interests] - - for interest in interests: - combinedvals[interest] = extract_combined_value( - newpvstate, oldpvstate, interest - ) - for key in newpvstate[interest]: - fullkey = f"{interest}.{key}" - combinedvals[fullkey] = extract_combined_value( - newpvstate, oldpvstate, fullkey - ) - - return combinedvals - -class NTScalarRulesHandler(BaseRulesHandler): - """ - Rules handler for NTScalar PVs. - """ - def __init__(self) -> None: - super().__init__() - - self._init_rules.update({ - 'control' : self.evaluate_control_limits, - 'alarm_limit' : self.evaluate_alarm_limits - }) - - self._put_rules["control"] = self._controls_rule - self._put_rules["alarm_limit"] = self._alarm_limit_rule - self._put_rules.move_to_end("timestamp") - - def _controls_rule(self, pv: SharedPV, op: ServerOperation) -> RulesFlow: - """Check whether control limits should trigger and restrict values appropriately""" - logger.debug("Evaluating control limits") - - oldpvstate: Value = pv.current().raw - newpvstate: Value = op.value().raw - - # Check if there are any controls! - if "control" not in newpvstate and "control" not in oldpvstate: - logger.debug("control not present in structure") - return RulesFlow.CONTINUE - - combinedvals = self._combined_pvstates(oldpvstate, newpvstate, "control") - - # Check minimum step first - if ( - abs(newpvstate["value"] - oldpvstate["value"]) - < combinedvals["control.minStep"] - ): - logger.debug(" None | int | Numeric: - """ Check whether a value should be clipped by the control limits """ - - if not 'control' in combinedvals: - # logger.debug("control not present in structure") - return None - - # A philosophical question! What should we do when lowLimit = highLimit = 0? - # This almost certainly means the structure hasn't been initialised, but it could - # be an attempt (for some reason) to lock the value to 0. For now we treat this - # as uninitialised and ignore limits in this case. Users will have to handle - # keeping the PV constant at 0 themselves - if ( - combinedvals["control.limitLow"] == 0 - and combinedvals["control.limitHigh"] == 0 - ): - # logger.info( - # "control.limitLow and control.LimitHigh set to 0, so ignoring control limits" - # ) - return None - - # Check lower and upper control limits - if combinedvals["value"] < combinedvals["control.limitLow"]: - value = combinedvals["control.limitLow"] - # logger.debug("Lower control limit exceeded") - return value - - if combinedvals["value"] > combinedvals["control.limitHigh"]: - value = combinedvals["control.limitHigh"] - # logger.debug("Upper control limit exceeded") - return value - - return None - - def __alarm_state_check( - self, combinedvals: dict, newpvstate: Value, alarm_type: str, op=None - ) -> bool: - """Check whether the PV should be in an alarm state """ - if not op: - if alarm_type.startswith("low"): - op = operator.le - elif alarm_type.startswith("high"): - op = operator.ge - else: - raise SyntaxError( - f"CheckAlarms/alarmStateCheck: do not know how to handle {alarm_type}" - ) - - severity = combinedvals[f"valueAlarm.{alarm_type}Severity"] - if ( - op(combinedvals["value"], combinedvals[f"valueAlarm.{alarm_type}Limit"]) - and severity - ): - newpvstate["alarm.severity"] = severity - if not newpvstate.changed("alarm.message"): - newpvstate["alarm.message"] = alarm_type - - logger.debug( - "Setting %s to severity %i",# with message '%s'", - self._name, severity#, newpvstate['alarm.message'] - ) - - return True - - return False - - def _alarm_limit_rule(self, pv: SharedPV, op: ServerOperation) -> RulesFlow: - """ Evaluate alarm limits to see if we should change severity or message""" - oldpvstate: Value = pv.current().raw - newpvstate: Value = op.value().raw - - # Check if there are alarms are present in the structure! - if "alarm" not in newpvstate and "alarm" not in oldpvstate: - logger.debug("alarm not present in structure") - return RulesFlow.CONTINUE - - # Check if valueAlarms are present - if "valueAlarm" not in newpvstate and "valueAlarm" not in oldpvstate: - logger.debug("valueAlarm not present in structure") - return RulesFlow.CONTINUE - - combinedvals = self._combined_pvstates(oldpvstate, newpvstate, ["valueAlarm", "alarm"]) - - self.evaluate_alarm_limits(combinedvals, newpvstate) - return RulesFlow.CONTINUE - - def evaluate_alarm_limits(self, combinedvals, pvstate : Value) -> None | Value: - """ Evaluate alarm value limits """ - # TODO: Apply the rule for hysteresis. Unfortunately I don't understand the - # explanation in the Normative Types specification... - - if 'valueAlarm' not in combinedvals: - logger.debug("valueAlarm not present in structure") - return None - - # Check if valueAlarms are present and active! - if not combinedvals["valueAlarm.active"]: - logger.debug("valueAlarm not active") - return None - - logger.debug("Processing valueAlarm for %s", self._name) - - try: - # The order of these tests is defined in the Normative Types document - if self.__alarm_state_check(combinedvals, pvstate, "highAlarm"): - return pvstate - if self.__alarm_state_check(combinedvals, pvstate, "lowAlarm"): - return pvstate - if self.__alarm_state_check(combinedvals, pvstate, "highWarning"): - return pvstate - if self.__alarm_state_check(combinedvals, pvstate, "lowWarning"): - return pvstate - except SyntaxError: - # TODO: Need more specific error than SyntaxError - return None - - # If we made it here then there are no alarms or warnings and we need to indicate that - # possibly by resetting any existing ones - #combinedvals = self._combined_pvstates(oldpvstate, newpvstate, "alarm") - alarms_changed = False - if combinedvals["alarm.severity"]: - pvstate["alarm.severity"] = 0 - alarms_changed = True - if combinedvals["alarm.message"]: - pvstate["alarm.message"] = "" - alarms_changed = True - - if alarms_changed: - logger.debug( - "Setting %s to severity %i with message '%s'", - self._name, pvstate['alarm.severity'], pvstate['alarm.message'] - ) - return pvstate - - logger.debug("Made no automatic changes to alarm state of %s", self._name) - return None diff --git a/src/p4p/server/raw.py b/src/p4p/server/raw.py index 2629f780..344099de 100644 --- a/src/p4p/server/raw.py +++ b/src/p4p/server/raw.py @@ -39,6 +39,12 @@ class Handler(object): Use of this as a base class is optional. """ + def open(self, value): + """ + Called each time an Open operation is performed on this Channel + + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + """ def put(self, pv, op): """ @@ -50,6 +56,16 @@ def put(self, pv, op): """ op.done(error='Not supported') + def post(self, pv, value): + """ + Called each time a client issues a post + operation on this Channel. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + """ + pass + def rpc(self, pv, op): """ Called each time a client issues a Remote Procedure Call @@ -76,6 +92,13 @@ def onLastDisconnect(self, pv): """ pass + def close(self, pv): + """ + Called when the Channel is closed. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + """ + pass class SharedPV(_SharedPV): @@ -154,10 +177,22 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws): self._wrap = wrap or (nt and nt.wrap) or self._wrap self._unwrap = unwrap or (nt and nt.unwrap) or self._unwrap + evaluate_rules = kws.pop("use_handler_open", True) + try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + # Apply rules unless they've been switched off + if evaluate_rules: + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.open(V) + except AttributeError as err: + pass + _SharedPV.open(self, V) def post(self, value, **kws): @@ -170,12 +205,43 @@ def post(self, value, **kws): Any keyword arguments are forwarded to the NT wrap() method (if applicable). Common arguments include: timestamp= , severity= , and message= . """ + evaluate_rules = kws.pop("use_handler_post", True) + try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + # Apply rules unless they've been switched off + if evaluate_rules: + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.post(self, V) + except AttributeError: + pass + _SharedPV.post(self, V) + def close(self, destroy=False, sync=False, timeout=None): + """Close PV, disconnecting any clients. + + :param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open(). + :param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies). + :param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value. + + close() with destory=True or sync=True will not prevent clients from re-connecting. + New clients may prevent sync=True from succeeding. + Prevent reconnection by __first__ stopping the Server, removing with :py:meth:`StaticProvider.remove()`, + or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV. + """ + try: + self._handler.close(self) + except AttributeError: + pass + + _SharedPV.close(self) + def current(self): V = _SharedPV.current(self) try: @@ -208,7 +274,15 @@ def __init__(self, pv, real): self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC self._real = real + def open(self, value): + _log.debug('OPEN %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.open, value) + except AttributeError: + pass + def onFirstConnect(self): + _log.debug('ONFIRSTCONNECT %s', self._pv) self._pv._exec(None, self._pv._onFirstConnect, None) try: # user handler may omit onFirstConnect() M = self._real.onFirstConnect @@ -217,6 +291,7 @@ def onFirstConnect(self): self._pv._exec(None, M, self._pv) def onLastDisconnect(self): + _log.debug('ONLASTDISCONNECT %s', self._pv) try: M = self._real.onLastDisconnect except AttributeError: @@ -239,33 +314,76 @@ def rpc(self, op): except AttributeError: op.done(error="RPC not supported") + def post(self, value): + _log.debug('POST %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.rpc, self._pv, value) + except AttributeError: + pass + + def close(self, pv): + _log.debug('CLOSE %s', self._pv) + try: + self._pv._exec(None, self._real.close, self._pv) + except AttributeError: + pass + @property - def onFirstConnect(self): + def on_open(self): + def decorate(fn): + self._handler.open = fn + return fn + return decorate + + @property + def on_first_connect(self): def decorate(fn): self._handler.onFirstConnect = fn return fn return decorate @property - def onLastDisconnect(self): + def on_last_disconnect(self): def decorate(fn): self._handler.onLastDisconnect = fn return fn return decorate @property - def put(self): + def on_put(self): def decorate(fn): self._handler.put = fn return fn return decorate @property - def rpc(self): + def on_rpc(self): def decorate(fn): self._handler.rpc = fn return fn return decorate + + @property + def on_post(self): + def decorate(fn): + self._handler.post = fn + return fn + return decorate + + @property + def on_close(self): + def decorate(fn): + self._handler.close = fn + return fn + return decorate + + # Aliases for decorators to maintain consistent new style + # Required because post is already used and on_post seemed the best + # alternative. + put = on_put + rpc = on_rpc + onFirstConnect = on_first_connect + onLastDisconnect = on_last_disconnect def __repr__(self): if self.isOpen(): diff --git a/src/p4p/test/test_sharedpv.py b/src/p4p/test/test_sharedpv.py index a810ba32..d07a7afd 100644 --- a/src/p4p/test/test_sharedpv.py +++ b/src/p4p/test/test_sharedpv.py @@ -9,6 +9,8 @@ import inspect import threading +from p4p.server.raw import Handler + try: from Queue import Queue, Full, Empty except ImportError: @@ -149,6 +151,144 @@ def testMonitor(self): gc.collect() self.assertIsNone(C()) +class TestNewHandler(RefTestCase): + maxDiff = 1000 + timeout = 1.0 + + class Times3Handler(Handler): + # Note that the prototypes of open() and post() return None and here + # we have them returning bool. A more rigorous solution might use + # Exceptions instead, which would also allow an error message to be + # passed to the client via the op.done() + def open(self, value): + if value.changed('value'): + if value["value"] < 0: + value.unmark() + return False + value["value"] = value["value"] * 3 + + return True + + def post(self, pv, value): + return self.open(value) + + def put(self, pv, op): + if not self.post(pv, op.value().raw): + op.done(error="Must be non-negative") + pv.post(op.value(),use_handler_post=False) + op.done() + + def setUp(self): + # gc.set_debug(gc.DEBUG_LEAK) + super(TestNewHandler, self).setUp() + + self.pv = SharedPV(handler=self.Times3Handler(), nt=NTScalar('d')) + self.pv2 = SharedPV(handler=self.Times3Handler(), nt=NTScalar('d'), initial=42.0) + self.sprov = StaticProvider("serverend") + self.sprov.add('foo', self.pv) + self.sprov.add('bar', self.pv2) + + self.server = Server(providers=[self.sprov], isolate=True) + _log.debug('Server Conf: %s', self.server.conf()) + + def tearDown(self): + self.server.stop() + _defaultWorkQueue.sync() + #self.pv._handler._pv = None + R = [weakref.ref(r) for r in (self.server, self.sprov, self.pv, self.pv._whandler, self.pv._handler)] + r = None + del self.server + del self.sprov + del self.pv + del self.pv2 + gc.collect() + R = [r() for r in R] + self.assertListEqual(R, [None] * len(R)) + super(TestNewHandler, self).tearDown() + + def testCurrent(self): + self.pv.open(1.0) + self.assertEqual(self.pv.current(), 3.0) + + def testGet(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + _log.debug('Client conf: %s', ctxt.conf()) + # PV not yet opened + self.assertRaises(TimeoutError, ctxt.get, 'foo', timeout=0.1) + + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 3.0) + self.assertTrue(V.raw.changed('value')) + + self.assertEqual(ctxt.get(['foo', 'bar']), [1.0 * 3, 42.0 * 3]) + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + def testPutGet(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 3.0) + + ctxt.put('foo', 5) + + V = ctxt.get('foo') + self.assertEqual(V, 15.0) + + ctxt.put(['foo', 'bar'], [5, 6]) + + self.assertEqual(ctxt.get(['foo', 'bar']), [5 * 3, 6 * 3]) + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + def testMonitor(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + + self.pv.open(1.0) + + Q = Queue(maxsize=4) + sub = ctxt.monitor('foo', Q.put, notify_disconnect=True) + + V = Q.get(timeout=self.timeout) + self.assertIsInstance(V, Disconnected) + + V = Q.get(timeout=self.timeout) + self.assertEqual(V, 3.0) + + ctxt.put('foo', 4) + + V = Q.get(timeout=self.timeout) + self.assertEqual(V, 12.0) + + self.pv.close() + + V = Q.get(timeout=self.timeout) + self.assertIsInstance(V, Disconnected) + + self.pv.open(3.0) + ctxt.hurryUp() + + V = Q.get(timeout=self.timeout) + self.assertEqual(V, 9.0) + + C = weakref.ref(ctxt) + del ctxt + del sub + del Q + gc.collect() + self.assertIsNone(C()) + + class TestRPC(RefTestCase): maxDiff = 1000 timeout = 1.0 @@ -235,6 +375,7 @@ def test_rpc_error(self): with self.assertRaisesRegex(RemoteError, 'oops'): ret = C.rpc('foo', args.wrap('foo', kws={'oops':True})) + class TestRPC2(TestRPC): openclose = True From fecb939de8a46a6bbfd282136e47e204c9e5f324 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Sat, 24 Aug 2024 12:31:50 +0100 Subject: [PATCH 04/10] Add another example showing how a handler.post can allow persistence --- example/ntscalar_walarm.py | 19 ------ example/persist.py | 136 +++++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 19 deletions(-) delete mode 100644 example/ntscalar_walarm.py create mode 100644 example/persist.py diff --git a/example/ntscalar_walarm.py b/example/ntscalar_walarm.py deleted file mode 100644 index fef921c0..00000000 --- a/example/ntscalar_walarm.py +++ /dev/null @@ -1,19 +0,0 @@ -import logging -logger = logging.getLogger(__name__) - -from p4p.nt import NTScalar -from p4p.server import Server -from p4p.server.thread import SharedPV -from p4p.server.nthandlers import NTScalarRulesHandler - -# Construct PV with control and valueAlarm structures -# and then set the values of some of those values with a post -pv = SharedPV(nt=NTScalar('d', control=True, valueAlarm=True), - handler=NTScalarRulesHandler(), - initial=12.0) -pv.post({'control.limitHigh': 6, - 'valueAlarm.active': True, 'valueAlarm.lowAlarmLimit': 1, 'valueAlarm.lowAlarmSeverity':2}) - -Server.forever(providers=[{ - 'demo:pv:name':pv, # PV name only appears here -}]) diff --git a/example/persist.py b/example/persist.py new file mode 100644 index 00000000..969ca04b --- /dev/null +++ b/example/persist.py @@ -0,0 +1,136 @@ +""" +Use a handler to automatically persist values to an SQLite3 file database. +Any values persisted this way will be automatically restored when the +program is rerun. + +There are a number of caveats for this simple demo. +- The persist_handler will not work as expected if anything other than the + value of a field is changed, e.g. if a Control field was added to an NTScalar + if would not be persisted correctly. This could be resolved by correctly + merging the pv.current() and value appropriately. +- Duplicate PVs are not supported. This would require more complicated helper + functions for setting up the SharedPVs. +""" + +import json +import random +import sqlite3 +import time +from typing import Any, Tuple + +from p4p import Value +from p4p.nt.scalar import NTScalar +from p4p.server import Server, ServerOperation +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class persist_handler(Handler): + """ + A handler that will allow simple persistence of values and timestamps + across retarts. It requires a post handler in order to persist values + set within the program. + """ + def __init__(self, pv_name: str, conn: sqlite3.Connection): + self._conn = conn + self._pv_name = pv_name + + def post(self, pv: SharedPV, value: Value): + # Always update the timestamp + if value.changed(): + now = time.time() + value["timeStamp.secondsPastEpoch"] = int(now // 1) + value["timeStamp.nanoseconds"] = int((now % 1) * 1e9) + + # Persist the data + val_json = json.dumps(value.todict()) + + cur = self._conn.cursor() + cur.execute( + """ + INSERT INTO pvs (id, data) VALUES (?, ?) + ON CONFLICT(id) + DO UPDATE set data = ?; + """, + [self._pv_name, val_json, val_json], + ) + conn.commit() + cur.close() + + def put(self, pv: SharedPV, op: ServerOperation): + pv.post(op.value()) # Triggers the post() above + op.done() + + +# Helper functions for restoring values on program startup +def get_initial(pv_name: str, conn: sqlite3.Connection, default=None) -> Tuple[Any, Any]: + """ + Retrieve the initial value from the SQLite database and if there isn't + a value then return a default value instead + """ + + cur = conn.cursor() + res = cur.execute("SELECT data FROM pvs WHERE id=?", [pv_name]) + query_val = res.fetchone() + cur.close() + + if query_val is not None: + json_val = json.loads(query_val[0]) + print(f"Will restore to {pv_name} value: {json_val['value']}") + return json_val["value"], json_val + + return default, None + + +def setup_pv(name: str, type: str, conn: sqlite3.Connection, default=None) -> SharedPV: + """ + Setting up these PVs with the handler and restoring their values is + somewhat complex! + """ + initial, full_result = get_initial(name, conn, default) + + timestamp = None + if full_result: + if full_result.get("timeStamp"): + timestamp = ( + full_result["timeStamp"]["secondsPastEpoch"] + + full_result["timeStamp"]["nanoseconds"] / 1e9 + ) + + return SharedPV( + nt=NTScalar(type), + handler=persist_handler(name, conn), + initial=initial, + timestamp=timestamp, + ) + + +# Create an SQLite dayabase to function as our persistence store +conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) +cur = conn.cursor() +cur.execute( + "CREATE TABLE IF NOT EXISTS pvs (id varchar(255), data json, PRIMARY KEY (id));" +) +cur.close() + +pvs = { + "demo:pv:randint": setup_pv("demo:pv:randint", "i", conn, default=-1), + "demo:pv:int": setup_pv("demo:pv:int", "i", conn, default=12), + "demo:pv:float": setup_pv("demo:pv:float", "d", conn, default=9.99), + "demo:pv:string": setup_pv("demo:pv:string", "s", conn, default="Hello!"), +} + +print(f"Starting server with the following PVs: {pvs.keys()}") + +server = None +try: + server = Server(providers=[pvs]) + while True: + time.sleep(1) + pvs["demo:pv:randint"].post(random.randint(1, 1000)) +except KeyboardInterrupt: + pass +finally: + if server: + server.stop() + conn.close() From 6b678396e341e40e1695ccf15ac14da546981f7a Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Sat, 24 Aug 2024 12:42:46 +0100 Subject: [PATCH 05/10] Duplicate pvs do work --- example/persist.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/example/persist.py b/example/persist.py index 969ca04b..895ab5f6 100644 --- a/example/persist.py +++ b/example/persist.py @@ -3,13 +3,11 @@ Any values persisted this way will be automatically restored when the program is rerun. -There are a number of caveats for this simple demo. -- The persist_handler will not work as expected if anything other than the - value of a field is changed, e.g. if a Control field was added to an NTScalar - if would not be persisted correctly. This could be resolved by correctly - merging the pv.current() and value appropriately. -- Duplicate PVs are not supported. This would require more complicated helper - functions for setting up the SharedPVs. +There are is an important caveat for this simple demo: +The persist_handler will not work as expected if anything other than the +value of a field is changed, e.g. if a Control field was added to an NTScalar +if would not be persisted correctly. This could be resolved by correctly +merging the pv.current() and value appropriately. """ import json @@ -113,11 +111,13 @@ def setup_pv(name: str, type: str, conn: sqlite3.Connection, default=None) -> Sh ) cur.close() +duplicate_pv = setup_pv("demo:pv:int", "i", conn, default=12) pvs = { "demo:pv:randint": setup_pv("demo:pv:randint", "i", conn, default=-1), - "demo:pv:int": setup_pv("demo:pv:int", "i", conn, default=12), + "demo:pv:int": duplicate_pv, "demo:pv:float": setup_pv("demo:pv:float", "d", conn, default=9.99), "demo:pv:string": setup_pv("demo:pv:string", "s", conn, default="Hello!"), + "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore } print(f"Starting server with the following PVs: {pvs.keys()}") From c2a7f989a2c34314d5fa6bba2c235aa73a4de4d3 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Sun, 25 Aug 2024 17:18:53 +0100 Subject: [PATCH 06/10] Minor formatting changes --- example/auditor.py | 15 +++++++-------- example/ntscalar_control.py | 6 ++++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/example/auditor.py b/example/auditor.py index db9ed515..48b1680a 100644 --- a/example/auditor.py +++ b/example/auditor.py @@ -14,7 +14,7 @@ from p4p.server.thread import SharedPV -class auditor(Handler): +class Auditor(Handler): """Persist information to file so we can audit when the program is closed""" def open(self, value): @@ -30,7 +30,7 @@ def close(self, pv): f.write(f"Auditing closed at {time.ctime()}; no changes made\n") -class audited(Handler): +class Audited(Handler): """Forward information about Put operations to the auditing PV""" def __init__(self, pv: SharedPV): @@ -46,26 +46,25 @@ def put(self, pv, op): # Setup the PV that will make the audit information available. # Note that there is no put in its handler so it will be externally read-only -auditor_pv = SharedPV(nt=NTScalar("s"), handler=auditor(), initial="") +auditor_pv = SharedPV(nt=NTScalar("s"), handler=Auditor(), initial="") # Setup some PVs that will be audited and one that won't be # Note that the audited handler does have a put so these PVs can be changed externally pvs = { "demo:pv:auditor": auditor_pv, "demo:pv:audited_d": SharedPV( - nt=NTScalar("d"), handler=audited(auditor_pv), initial=9.99 + nt=NTScalar("d"), handler=Audited(auditor_pv), initial=9.99 ), "demo:pv:audited_i": SharedPV( - nt=NTScalar("i"), handler=audited(auditor_pv), initial=4 + nt=NTScalar("i"), handler=Audited(auditor_pv), initial=4 ), "demo:pv:audited_s": SharedPV( - nt=NTScalar("s"), handler=audited(auditor_pv), initial="Testing" + nt=NTScalar("s"), handler=Audited(auditor_pv), initial="Testing" ), "demo:pv:unaudted_i": SharedPV(nt=NTScalar("i"), initial=-1), } -print(pvs.keys()) -server = None +print(pvs.keys()) try: Server.forever(providers=[pvs]) except KeyboardInterrupt: diff --git a/example/ntscalar_control.py b/example/ntscalar_control.py index 34c093fa..55286430 100644 --- a/example/ntscalar_control.py +++ b/example/ntscalar_control.py @@ -101,7 +101,9 @@ def put(self, pv, op): return else: if op.value().raw.changed("control"): - errmsg = f"Unauthorised attempt to set Control by {op.account()}" + errmsg = ( + f"Unauthorised attempt to set Control by {op.account()}" + ) op.value().raw.mark("control", False) # Because we have not set use_handler_post=False in the post this @@ -125,7 +127,7 @@ def put(self, pv, op): ) -@pv.put +@pv.on_put def handle(pv, op): pv.post(op.value()) # just store and update subscribers op.done() From 9aa3a0a249590fb912d6873052526cd7e9a6c232 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Sun, 25 Aug 2024 17:19:35 +0100 Subject: [PATCH 07/10] Change to how handler arguments are treated --- src/p4p/server/raw.py | 52 ++++---- src/p4p/test/test_sharedpv.py | 233 ++++++++++++++++++++++++++++++++-- 2 files changed, 251 insertions(+), 34 deletions(-) diff --git a/src/p4p/server/raw.py b/src/p4p/server/raw.py index 344099de..1219af47 100644 --- a/src/p4p/server/raw.py +++ b/src/p4p/server/raw.py @@ -39,7 +39,7 @@ class Handler(object): Use of this as a base class is optional. """ - def open(self, value): + def open(self, value, **kws): """ Called each time an Open operation is performed on this Channel @@ -56,13 +56,14 @@ def put(self, pv, op): """ op.done(error='Not supported') - def post(self, pv, value): + def post(self, pv, value, **kws): """ Called each time a client issues a post operation on this Channel. :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + :param dict options: A dictionary of configuration options. """ pass @@ -177,21 +178,22 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws): self._wrap = wrap or (nt and nt.wrap) or self._wrap self._unwrap = unwrap or (nt and nt.unwrap) or self._unwrap - evaluate_rules = kws.pop("use_handler_open", True) + # Intercept all arguments that start with 'handler_open_' and remove them from + # the arguments that go to the wrap and send them instead to the handler.open() + post_kws = {x: kws.pop(x) for x in [y for y in kws if y.startswith("handler_open_")]} try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) - # Apply rules unless they've been switched off - if evaluate_rules: - # Guard goes here because we can have handlers that don't inherit from - # the Handler base class - try: - self._handler.open(V) - except AttributeError as err: - pass + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.open(V, **post_kws) + except AttributeError as err: + pass _SharedPV.open(self, V) @@ -205,21 +207,21 @@ def post(self, value, **kws): Any keyword arguments are forwarded to the NT wrap() method (if applicable). Common arguments include: timestamp= , severity= , and message= . """ - evaluate_rules = kws.pop("use_handler_post", True) + # Intercept all arguments that start with 'handler_post_' and remove them from + # the arguments that go to the wrap and send them instead to the handler.post() + post_kws = {x: kws.pop(x) for x in [y for y in kws if y.startswith("handler_post_")]} try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) - - # Apply rules unless they've been switched off - if evaluate_rules: - # Guard goes here because we can have handlers that don't inherit from - # the Handler base class - try: - self._handler.post(self, V) - except AttributeError: - pass + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.post(self, V, **post_kws) + except AttributeError: + pass _SharedPV.post(self, V) @@ -274,10 +276,10 @@ def __init__(self, pv, real): self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC self._real = real - def open(self, value): + def open(self, value, **kws): _log.debug('OPEN %s %s', self._pv, value) try: - self._pv._exec(None, self._real.open, value) + self._pv._exec(None, self._real.open, value, **kws) except AttributeError: pass @@ -314,10 +316,10 @@ def rpc(self, op): except AttributeError: op.done(error="RPC not supported") - def post(self, value): + def post(self, value, **kws): _log.debug('POST %s %s', self._pv, value) try: - self._pv._exec(None, self._real.rpc, self._pv, value) + self._pv._exec(None, self._real.rpc, self._pv, value, **kws) except AttributeError: pass diff --git a/src/p4p/test/test_sharedpv.py b/src/p4p/test/test_sharedpv.py index d07a7afd..ea3cb071 100644 --- a/src/p4p/test/test_sharedpv.py +++ b/src/p4p/test/test_sharedpv.py @@ -3,14 +3,13 @@ import logging import unittest import random +from unittest.mock import patch import weakref import sys import gc import inspect import threading -from p4p.server.raw import Handler - try: from Queue import Queue, Full, Empty except ImportError: @@ -19,6 +18,7 @@ from ..wrapper import Value, Type from ..client.thread import Context, Disconnected, TimeoutError, RemoteError from ..server import Server, StaticProvider +from ..server.raw import Handler from ..server.thread import SharedPV, _defaultWorkQueue from ..util import WorkQueue from ..nt import NTScalar, NTURI @@ -151,7 +151,7 @@ def testMonitor(self): gc.collect() self.assertIsNone(C()) -class TestNewHandler(RefTestCase): +class TestGPMNewHandler(RefTestCase): maxDiff = 1000 timeout = 1.0 @@ -169,18 +169,21 @@ def open(self, value): return True - def post(self, pv, value): + def post(self, pv, value, **kws): + if not kws.pop("handler_post_enable", True): + return + return self.open(value) def put(self, pv, op): if not self.post(pv, op.value().raw): op.done(error="Must be non-negative") - pv.post(op.value(),use_handler_post=False) + pv.post(op.value(),handler_post_enable=False) op.done() def setUp(self): # gc.set_debug(gc.DEBUG_LEAK) - super(TestNewHandler, self).setUp() + super(TestGPMNewHandler, self).setUp() self.pv = SharedPV(handler=self.Times3Handler(), nt=NTScalar('d')) self.pv2 = SharedPV(handler=self.Times3Handler(), nt=NTScalar('d'), initial=42.0) @@ -204,11 +207,11 @@ def tearDown(self): gc.collect() R = [r() for r in R] self.assertListEqual(R, [None] * len(R)) - super(TestNewHandler, self).tearDown() + super(TestGPMNewHandler, self).tearDown() def testCurrent(self): - self.pv.open(1.0) - self.assertEqual(self.pv.current(), 3.0) + self.pv.open(1.0) + self.assertEqual(self.pv.current(), 3.0) def testGet(self): with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: @@ -289,6 +292,218 @@ def testMonitor(self): self.assertIsNone(C()) +class TestNewHandler(RefTestCase): + maxDiff = 1000 + timeout = 1.0 + + class ValueChangeHandler(Handler): + """Check whether calls work as expected""" + put_use_handler_post = True + + def open(self, value, **kwargs): + if not kwargs.pop("handler_open_enable", True): + return + + value["value"] = 1.1 + + def onFirstConnect(self, pv): + # Not intended to allow changes to pv? + pass + + def onLastDisconnect(self, pv): + # Not intended to allow changes to pv? + pass + + def post(self, pv, value, **kwargs): + if not kwargs.pop("handler_post_enable", True): + return + + value["value"] = 2.2 + + def put(self, pv, op): + op.value().raw["value"] = 3.3 + pv.post(op.value(), handler_post_enable=self.put_use_handler_post) + op.done() + + def close(self, pv): + # Not intended to allow changes to pv? + pass + + def setUp(self): + # gc.set_debug(gc.DEBUG_LEAK) + super(TestNewHandler, self).setUp() + + self.pv = SharedPV(nt=NTScalar('d'), handler=self.ValueChangeHandler()) + self.pv2 = SharedPV(nt=NTScalar('d'), initial=42.0) # No handler + self.sprov = StaticProvider("serverend") + self.sprov.add('foo', self.pv) + self.sprov.add('bar', self.pv2) + + self.server = Server(providers=[self.sprov], isolate=True) + _log.debug('Server Conf: %s', self.server.conf()) + + def tearDown(self): + self.server.stop() + _defaultWorkQueue.sync() + #self.pv._handler._pv = None + R = [weakref.ref(r) for r in (self.server, self.sprov, self.pv, self.pv._whandler, self.pv._handler)] + r = None + del self.server + del self.sprov + del self.pv + del self.pv2 + gc.collect() + R = [r() for r in R] + self.assertListEqual(R, [None] * len(R)) + super(TestNewHandler, self).tearDown() + + def test_open(self): + # Note that the mock.patch changes the ValueChangeHandler.open() into a noop + with patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.open') as mock_open: + # Test handler.open() calls processed correctly + self.pv.open(1.0) + self.assertEqual(self.pv.current(), 1.0) + mock_open.assert_called_once() + mock_open.reset_mock() + + # Test handler.open() calls processed correctly after close() + self.pv.close() + self.pv.open(2.0) + self.assertEqual(self.pv.current(), 2.0) + mock_open.assert_called_once() + mock_open.reset_mock() + + # Test nothing goes wrong when we have no handler set in the SharedPV + self.pv2.close() + self.pv2.open(1.0) + self.assertEqual(self.pv2.current(), 1.0) + mock_open.assert_not_called() + mock_open.reset_mock() + + # Check that value changes in a handler happen correctly + self.pv.close() + self.pv.open(55.0) + self.assertEqual(self.pv.current(), 1.1) + + # Check that handler_open_ arguments are passed correctly + self.pv.close() + self.pv.open(33.0, handler_open_enable=False) + self.assertEqual(self.pv.current(), 33.0) + + def test_post(self): + # Note that the mock.patch changes the ValueChangeHandler.post() into a noop + with patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.post') as mock_post: + # Test handler.open() calls processed correctly; again patch means our function isn't called + self.pv.open(1.0) + self.pv.post(5.0) + mock_post.assert_called_once() + self.assertEqual(self.pv.current(), 5.0) + mock_post.reset_mock() + + # Test nothing goes wrong when we have no handler set in the SharedPV + self.pv2.post(6.0) + mock_post.assert_not_called() + self.assertEqual(self.pv2.current(), 6.0) + mock_post.reset_mock() + + # Check that value changes in a handler happen correctly + self.pv.close() + self.pv.open(1.0) + self.pv.post(9.9) + self.assertEqual(self.pv.current(), 2.2) + + # Check that handler_post_ arguments are passed correctly + self.pv.close() + self.pv.open(1.0) + self.pv.post(77.0, handler_post_enable=False) + self.assertEqual(self.pv.current(), 77.0) + + def test_get(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + _log.debug('Client conf: %s', ctxt.conf()) + # PV not yet opened + self.assertRaises(TimeoutError, ctxt.get, 'foo', timeout=0.1) + + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + self.assertTrue(V.raw.changed('value')) + + self.assertEqual(ctxt.get(['foo', 'bar']), [1.1, 42.0]) + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + def test_put_get(self): + # Test handler.put() is called as expected and nothing breaks if it is absent + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt, \ + patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.put') as mock_put: + + # Check the handler.put() is called + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + + # This will timeout as the patch has rendered the handler.post() a noop + # and so we'll never issue an op.done() but we can still check it was called + self.assertRaises(TimeoutError, ctxt.put, 'foo', 5, timeout=0.1) + mock_put.assert_called_once() + mock_put.reset_mock() + + # Check that the new code does not affect the operation of a SharedPV with no handler + # This will also timeout as a PV without handler doesn't allow puts! + V = ctxt.get('bar') + self.assertEqual(V, 42.0) + + self.assertRaises(RemoteError, ctxt.put, 'bar', 5, timeout=0.1) + mock_put.assert_not_called() + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + # Test interaction with handler.post() + self.pv.close() + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt, \ + patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.post') as mock_post: + + self.pv.open(1.0) + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + + ctxt.put('foo', 15) + + self.assertEqual(ctxt.get('foo'), 3.3) + mock_post.assert_called_once() + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + # Test interaction with handler.post() without any patches to interfere + self.pv.close() + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + + self.pv.open(1.0) + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + + ctxt.put('foo', 15) + + self.assertEqual(ctxt.get('foo'), 2.2) + mock_post.assert_called_once() + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + class TestRPC(RefTestCase): maxDiff = 1000 timeout = 1.0 From 706602f2761c5fcc9ac01d007e65d581f2e7b282 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Sun, 25 Aug 2024 17:20:06 +0100 Subject: [PATCH 08/10] Most of the restore work can be done in the handler open() --- example/persist.py | 187 +++++++++++++++++++++++++++------------------ 1 file changed, 112 insertions(+), 75 deletions(-) diff --git a/example/persist.py b/example/persist.py index 895ab5f6..f54c8d99 100644 --- a/example/persist.py +++ b/example/persist.py @@ -1,20 +1,25 @@ """ Use a handler to automatically persist values to an SQLite3 file database. -Any values persisted this way will be automatically restored when the -program is rerun. +Any values persisted this way will be automatically restored when the +program is rerun. The details of users who -There are is an important caveat for this simple demo: -The persist_handler will not work as expected if anything other than the +Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the +program while continuing to monitor the PV. Compare with the value of +`demo:pv:uptime` which resets on each program start. Try setting the value of +demo:pv:optime while continuing to monitor it. It is recommended to +inspect the persisted file, e.g. `sqlite3 persist_pvs.db "select * from pvs"`. + +There is an important caveat for this simple demo: +The `PersistHandler` will not work as expected if anything other than the value of a field is changed, e.g. if a Control field was added to an NTScalar -if would not be persisted correctly. This could be resolved by correctly -merging the pv.current() and value appropriately. +if would not be persisted correctly. This could be resolved by correctly +merging the pv.current().raw and value.raw appropriately in the post() +and put(). """ import json -import random import sqlite3 import time -from typing import Any, Tuple from p4p import Value from p4p.nt.scalar import NTScalar @@ -23,111 +28,143 @@ from p4p.server.thread import SharedPV -class persist_handler(Handler): +class PersistHandler(Handler): """ A handler that will allow simple persistence of values and timestamps across retarts. It requires a post handler in order to persist values set within the program. """ + def __init__(self, pv_name: str, conn: sqlite3.Connection): self._conn = conn self._pv_name = pv_name - def post(self, pv: SharedPV, value: Value): - # Always update the timestamp - if value.changed(): + def open(self, value, **kws): + # If there is a value already in the database we always use that + # instead of the supplied initial value, unless the + # handler_open_restore flag indicates otherwise. + if not kws.pop("handler_open_restore", True): + return + + res = self._conn.execute( + "SELECT data FROM pvs WHERE id=?", [self._pv_name] + ) + query_val = res.fetchone() + + if query_val is not None: + # We could, in theory, re-apply authentication here based on the + # account and peer information. + json_val = json.loads(query_val[0]) + print(f"Will restore to {self._pv_name} value: {json_val['value']}") + + # Override initial value + value["value"] = json_val["value"] + + value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][ + "secondsPastEpoch" + ] + value["timeStamp.nanoseconds"] = json_val["timeStamp"][ + "nanoseconds" + ] + else: + self._upsert(value) + + def _update_timestamp(self, value): + if not value.changed("timeStamp"): now = time.time() - value["timeStamp.secondsPastEpoch"] = int(now // 1) + value["timeStamp.secondsPastEpoch"] = now // 1 value["timeStamp.nanoseconds"] = int((now % 1) * 1e9) + def _upsert(self, value, account=None, peer=None) -> None: # Persist the data val_json = json.dumps(value.todict()) - cur = self._conn.cursor() - cur.execute( + # Use UPSERT: https://sqlite.org/lang_upsert.html + conn.execute( """ - INSERT INTO pvs (id, data) VALUES (?, ?) - ON CONFLICT(id) - DO UPDATE set data = ?; - """, - [self._pv_name, val_json, val_json], + INSERT INTO pvs (id, data, account, peer) + VALUES (:name, :json_data, :account, :peer) + ON CONFLICT(id) + DO UPDATE SET data = :json_data, account = :account, peer = :peer; + """, + { + "name": self._pv_name, + "json_data": val_json, + "account": account, + "peer": peer, + }, ) conn.commit() - cur.close() - - def put(self, pv: SharedPV, op: ServerOperation): - pv.post(op.value()) # Triggers the post() above - op.done() - - -# Helper functions for restoring values on program startup -def get_initial(pv_name: str, conn: sqlite3.Connection, default=None) -> Tuple[Any, Any]: - """ - Retrieve the initial value from the SQLite database and if there isn't - a value then return a default value instead - """ - cur = conn.cursor() - res = cur.execute("SELECT data FROM pvs WHERE id=?", [pv_name]) - query_val = res.fetchone() - cur.close() + def post(self, pv: SharedPV, value: Value, **kwargs): + self._update_timestamp(value) - if query_val is not None: - json_val = json.loads(query_val[0]) - print(f"Will restore to {pv_name} value: {json_val['value']}") - return json_val["value"], json_val - - return default, None - - -def setup_pv(name: str, type: str, conn: sqlite3.Connection, default=None) -> SharedPV: - """ - Setting up these PVs with the handler and restoring their values is - somewhat complex! - """ - initial, full_result = get_initial(name, conn, default) + self._upsert( + value, + account=kwargs.pop("handler_post_upsert_account", None), + peer=kwargs.pop("handler_post_upsert_peer", None), + ) - timestamp = None - if full_result: - if full_result.get("timeStamp"): - timestamp = ( - full_result["timeStamp"]["secondsPastEpoch"] - + full_result["timeStamp"]["nanoseconds"] / 1e9 - ) + def put(self, pv: SharedPV, op: ServerOperation): + value = op.value().raw - return SharedPV( - nt=NTScalar(type), - handler=persist_handler(name, conn), - initial=initial, - timestamp=timestamp, - ) + # The post takes care of updating the timestamp + pv.post( + op.value(), + handler_post_upsert_account=op.account(), + handler_post_upsert_peer=op.peer(), + ) + op.done() # Create an SQLite dayabase to function as our persistence store conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) -cur = conn.cursor() -cur.execute( - "CREATE TABLE IF NOT EXISTS pvs (id varchar(255), data json, PRIMARY KEY (id));" -) -cur.close() +# conn.execute("DROP TABLE IF EXISTS pvs") +conn.execute( + "CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));" +) # IPv6 addresses can be long and will contain port number as well! -duplicate_pv = setup_pv("demo:pv:int", "i", conn, default=12) +duplicate_pv = SharedPV( + nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12 +) pvs = { - "demo:pv:randint": setup_pv("demo:pv:randint", "i", conn, default=-1), + "demo:pv:optime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:optime", conn), + initial=0, + ), # Operational time; total time running + "demo:pv:uptime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:optime", conn), + timestamp=time.time(), + initial=0, + handler_open_restore=False, + ), # Uptime since most recent (re)start "demo:pv:int": duplicate_pv, - "demo:pv:float": setup_pv("demo:pv:float", "d", conn, default=9.99), - "demo:pv:string": setup_pv("demo:pv:string", "s", conn, default="Hello!"), + "demo:pv:float": SharedPV( + nt=NTScalar("d"), + handler=PersistHandler("demo:pv:float", conn), + initial=9.99, + ), + "demo:pv:string": SharedPV( + nt=NTScalar("s"), + handler=PersistHandler("demo:pv:string", conn), + initial="Hello!", + ), "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore } -print(f"Starting server with the following PVs: {pvs.keys()}") +print(f"Starting server with the following PVs: {pvs}") server = None try: server = Server(providers=[pvs]) while True: time.sleep(1) - pvs["demo:pv:randint"].post(random.randint(1, 1000)) + increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1 + pvs["demo:pv:uptime"].post(increment_value) + increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1 + pvs["demo:pv:optime"].post(increment_value) except KeyboardInterrupt: pass finally: From 2f4a32cdc9cd5df9a8e724fb643e61c292c544bb Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Mon, 26 Aug 2024 08:29:08 +0100 Subject: [PATCH 09/10] Improvements to persist example --- example/persist.py | 91 +++++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 37 deletions(-) diff --git a/example/persist.py b/example/persist.py index f54c8d99..ec68d1fe 100644 --- a/example/persist.py +++ b/example/persist.py @@ -1,7 +1,8 @@ """ Use a handler to automatically persist values to an SQLite3 file database. Any values persisted this way will be automatically restored when the -program is rerun. The details of users who +program is rerun. The details of users (account name and IP address) are +recorded for puts. Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the program while continuing to monitor the PV. Compare with the value of @@ -13,8 +14,7 @@ The `PersistHandler` will not work as expected if anything other than the value of a field is changed, e.g. if a Control field was added to an NTScalar if would not be persisted correctly. This could be resolved by correctly -merging the pv.current().raw and value.raw appropriately in the post() -and put(). +merging the pv.current().raw and value.raw appropriately in the post(). """ import json @@ -39,21 +39,19 @@ def __init__(self, pv_name: str, conn: sqlite3.Connection): self._conn = conn self._pv_name = pv_name - def open(self, value, **kws): + def open(self, value, handler_open_restore=True): # If there is a value already in the database we always use that # instead of the supplied initial value, unless the # handler_open_restore flag indicates otherwise. - if not kws.pop("handler_open_restore", True): + if not handler_open_restore: return - res = self._conn.execute( - "SELECT data FROM pvs WHERE id=?", [self._pv_name] - ) + # We could, in theory, re-apply authentication here if we queried for + # that information and then did something with it! + res = self._conn.execute("SELECT data FROM pvs WHERE id=?", [self._pv_name]) query_val = res.fetchone() if query_val is not None: - # We could, in theory, re-apply authentication here based on the - # account and peer information. json_val = json.loads(query_val[0]) print(f"Will restore to {self._pv_name} value: {json_val['value']}") @@ -63,20 +61,46 @@ def open(self, value, **kws): value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][ "secondsPastEpoch" ] - value["timeStamp.nanoseconds"] = json_val["timeStamp"][ - "nanoseconds" - ] + value["timeStamp.nanoseconds"] = json_val["timeStamp"]["nanoseconds"] else: + # We are using an initial value so persist it self._upsert(value) - def _update_timestamp(self, value): - if not value.changed("timeStamp"): + def post( + self, + pv: SharedPV, + value: Value, + handler_post_upsert_account=None, + handler_post_upsert_peer=None, + ): + self._update_timestamp(value) + + self._upsert( + value, + account=handler_post_upsert_account, + peer=handler_post_upsert_peer, + ) + + def put(self, pv: SharedPV, op: ServerOperation): + # The post does all the real work, we just add info only available + # from the ServerOperation + pv.post( + op.value(), + handler_post_upsert_account=op.account(), + handler_post_upsert_peer=op.peer(), + ) + op.done() + + def _update_timestamp(self, value) -> None: + if not value.changed("timeStamp") or ( + value["timeStamp.nanoseconds"] == value["timeStamp.nanoseconds"] == 0 + ): now = time.time() value["timeStamp.secondsPastEpoch"] = now // 1 value["timeStamp.nanoseconds"] = int((now % 1) * 1e9) def _upsert(self, value, account=None, peer=None) -> None: - # Persist the data + # Persist the data; turn into JSON and write it to the DB val_json = json.dumps(value.todict()) # Use UPSERT: https://sqlite.org/lang_upsert.html @@ -96,26 +120,6 @@ def _upsert(self, value, account=None, peer=None) -> None: ) conn.commit() - def post(self, pv: SharedPV, value: Value, **kwargs): - self._update_timestamp(value) - - self._upsert( - value, - account=kwargs.pop("handler_post_upsert_account", None), - peer=kwargs.pop("handler_post_upsert_peer", None), - ) - - def put(self, pv: SharedPV, op: ServerOperation): - value = op.value().raw - - # The post takes care of updating the timestamp - pv.post( - op.value(), - handler_post_upsert_account=op.account(), - handler_post_upsert_peer=op.peer(), - ) - op.done() - # Create an SQLite dayabase to function as our persistence store conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) @@ -138,7 +142,7 @@ def put(self, pv: SharedPV, op: ServerOperation): handler=PersistHandler("demo:pv:optime", conn), timestamp=time.time(), initial=0, - handler_open_restore=False, + handler_open_restore=False, # Note that this option means it will always start at 0 ), # Uptime since most recent (re)start "demo:pv:int": duplicate_pv, "demo:pv:float": SharedPV( @@ -154,12 +158,25 @@ def put(self, pv: SharedPV, op: ServerOperation): "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore } + +# Make the uptime PV readonly; maybe we want to be able to update optime +# after major system upgrades? +uptime_pv = pvs["demo:pv:uptime"] + + +@uptime_pv.on_put +def read_only(pv: SharedPV, op: ServerOperation): + op.done(error="Read-only") + return + + print(f"Starting server with the following PVs: {pvs}") server = None try: server = Server(providers=[pvs]) while True: + # Every second increment the values of uptime and optime time.sleep(1) increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1 pvs["demo:pv:uptime"].post(increment_value) From 49c7e907d04a8497a8c3245b327105ab76d2f648 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Tue, 27 Aug 2024 22:26:21 +0100 Subject: [PATCH 10/10] Move control field initialisation to open initial. Improve comments --- example/ntscalar_control.py | 132 +++++++++++++++++++++++------------- 1 file changed, 83 insertions(+), 49 deletions(-) diff --git a/example/ntscalar_control.py b/example/ntscalar_control.py index 55286430..d785772f 100644 --- a/example/ntscalar_control.py +++ b/example/ntscalar_control.py @@ -1,8 +1,30 @@ +""" +A demonstration of using a handler to apply the Control field logic for an +Normative Type Scalar (NTScalar). + +There is only one PV, but it's behaviour is complex: +- try changing and checking the value, e.g. + `python -m p4p.client.cli put demo:pv=4` and + `python -m p4p.client.cli get demo:pv` +Initially the maximum = 11, minimum = -1, and minimum step size = 2. +Try varying the control settings, e.g. +- `python -m p4p.client.cli put demo:pv='{"value":5, "control.limitHigh":4}'` + `python -m p4p.client.cli get demo:pv` +Remove the comments at lines 166-169 and try again. + +This is also a demonstration of using the open(), put(), and post() callbacks +to implement this functionality, and particularly how it naturally partitions +the concerns of the three callback function: +- open() - logic based only on the input Value, +- post() - logic requiring comparison of cuurent and proposed Values +- put() - authorisation +""" + from p4p.nt import NTScalar from p4p.server import Server from p4p.server.raw import Handler from p4p.server.thread import SharedPV -from p4p.wrapper import Type, Value +from p4p.wrapper import Value class SimpleControl(Handler): @@ -11,7 +33,7 @@ class SimpleControl(Handler): Normative Type. """ - def __init__(self, min_value=None, max_value=None, min_step=None): + def __init__(self): # The attentive reader may wonder why we are keeping track of state here # instead of relying on control.limitLow, control.limitHigh, and # control.minStep. There are three possible reasons a developer might @@ -25,63 +47,78 @@ def __init__(self, min_value=None, max_value=None, min_step=None): # a Control field. # The disadvantage of this simple approach is that clients cannot # inspect the Control field values until they have been changed. - self._min_value = min_value # Minimum value allowed - self._max_value = max_value # Maximum value allowed - self._min_step = min_step # Minimum change allowed + self._min_value = None # Minimum value allowed + self._max_value = None # Maximum value allowed + self._min_step = None # Minimum change allowed - def open(self, value): + def open(self, value) -> bool: """ This function manages all logic when we only need to consider the (proposed) future state of a PV """ - # Check if the limitHigh has changed. If it has then we have to reevaluate - # the existing value. Note that for this to work with a post request we - # have to take the actions explained at Ref1 + value_changed_by_limit = False + + # Check if the limitHigh has changed. If it has then we have to + # reevaluate the existing value. Note that for this to work with a + # post() request we have to take the actions explained at Ref1 if value.changed("control.limitHigh"): self._max_value = value["control.limitHigh"] if value["value"] > self._max_value: value["value"] = self._max_value + value_changed_by_limit = True if value.changed("control.limitLow"): self._min_value = value["control.limitLow"] if value["value"] < self._min_value: value["value"] = self._min_value + value_changed_by_limit = True + + # This has to go in the open because it could be set in the initial value + if value.changed("control.minStep"): + self._min_step = value["control.minStep"] # If the value has changed we need to check it against the limits and # change it if any of the limits apply if value.changed("value"): if self._max_value and value["value"] > self._max_value: value["value"] = self._max_value - if self._min_value and value["value"] < self._min_value: + value_changed_by_limit = True + elif self._min_value and value["value"] < self._min_value: value["value"] = self._min_value + value_changed_by_limit = True + + return value_changed_by_limit - def post(self, pv, value): + def post(self, pv: SharedPV, value: Value): """ This function manages all logic when we need to know both the current and (proposed) future state of a PV """ - # If the minStep has changed update this instance's minStemp value - if value.changed("control.minStep"): - self._min_change = value["control.minStep"] - # [Ref1] This is where even our simple handler gets complex! # If the value["value"] has not been changed as part of the post() # operation then it will be set to a default value (i.e. 0) and - # marked unchanged. - current_value = pv.current().raw - value_changed = True # TODO: Explain this + # marked unchanged. For the logic in open() to work if the control + # limits are changed we need to set the pv.current().raw value in + # this case. if not value.changed("value"): - value["value"] = current_value["value"] + value["value"] = pv.current().raw["value"] value.mark("value", False) - value_changed = False - # Apply the control limits before the check for minimum change as the - # value may be altered by the limits. - self.open(value) - if not value_changed and value.changed("value"): + # Apply the control limits before the check for minimum change because: + # - the self._min_step may be updated + # - the value["value"] may be altered by the limits + value_changed_by_limit = self.open(value) + + # If the value["value"] wasn't changed by the put()/post() but was + # changed by the limits then we don't check the min_step but + # immediately return + if value_changed_by_limit: return - if abs(current_value["value"] - value["value"]) < self._min_step: + if ( + self._min_step + and abs(pv.current().raw["value"] - value["value"]) < self._min_step + ): value.mark("value", False) def put(self, pv, op): @@ -101,9 +138,7 @@ def put(self, pv, op): return else: if op.value().raw.changed("control"): - errmsg = ( - f"Unauthorised attempt to set Control by {op.account()}" - ) + errmsg = f"Unauthorised attempt to set Control by {op.account()}" op.value().raw.mark("control", False) # Because we have not set use_handler_post=False in the post this @@ -113,31 +148,30 @@ def put(self, pv, op): op.done(error=errmsg) -# Construct PV with control and structures -# and then set the values of some of those values with a post +# Construct a PV with Control fields and use a handler to apply the Normative +# Type logic. Note that the Control logic is correctly applied even to the +# initial value, based on the limits set in the rest of the initial value. pv = SharedPV( nt=NTScalar("d", control=True), - handler=SimpleControl(-1, 11, 2), - initial=12.0, # Immediately limited to 11 due to handler on live above -) -pv.post( - { - "control.limitHigh": 6, # Value now limited to 6 - } + handler=SimpleControl(), + initial={ + "value": 12.0, + "control.limitHigh": 11, + "control.limitLow": -1, + "control.minStep": 2, + }, # Immediately limited to 11 due to handler ) -@pv.on_put -def handle(pv, op): - pv.post(op.value()) # just store and update subscribers - op.done() +# Override the put in the handler so that we can perform puts for testing +# @pv.on_put +# def handle(pv, op): +# pv.post(op.value()) # just store and update subscribers +# op.done() -print("demo:pv:name: ", pv) -Server.forever( - providers=[ - { - "demo:pv:name": pv, - } - ] -) +pvs = { + "demo:pv": pv, +} +print("PVs: ", pvs) +Server.forever(providers=[pvs])