diff --git a/example/auditor.py b/example/auditor.py new file mode 100644 index 00000000..48b1680a --- /dev/null +++ b/example/auditor.py @@ -0,0 +1,74 @@ +""" +In this example we setup a simple auditing mechanism that reports information +about the last channel changed (which channel, when, and who by). Since we +might need to know this information even when the program is not running we +persist this data to file, including information about when changes could +have been made. +""" + +import time + +from p4p.nt.scalar import NTScalar +from p4p.server import Server +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class Auditor(Handler): + """Persist information to file so we can audit when the program is closed""" + + def open(self, value): + with open("audit.log", mode="a+") as f: + f.write(f"Auditing opened at {time.ctime()}\n") + + def close(self, pv): + with open("audit.log", mode="a+") as f: + value = pv.current().raw["value"] + if value: + f.write(f"Auditing closed at {time.ctime()}; {value}\n") + else: + f.write(f"Auditing closed at {time.ctime()}; no changes made\n") + + +class Audited(Handler): + """Forward information about Put operations to the auditing PV""" + + def __init__(self, pv: SharedPV): + self._audit_pv = pv + + def put(self, pv, op): + pv.post(op.value()) + self._audit_pv.post( + f"Channel {op.name()} last updated by {op.account()} at {time.ctime()}" + ) + op.done() + + +# Setup the PV that will make the audit information available. +# Note that there is no put in its handler so it will be externally read-only +auditor_pv = SharedPV(nt=NTScalar("s"), handler=Auditor(), initial="") + +# Setup some PVs that will be audited and one that won't be +# Note that the audited handler does have a put so these PVs can be changed externally +pvs = { + "demo:pv:auditor": auditor_pv, + "demo:pv:audited_d": SharedPV( + nt=NTScalar("d"), handler=Audited(auditor_pv), initial=9.99 + ), + "demo:pv:audited_i": SharedPV( + nt=NTScalar("i"), handler=Audited(auditor_pv), initial=4 + ), + "demo:pv:audited_s": SharedPV( + nt=NTScalar("s"), handler=Audited(auditor_pv), initial="Testing" + ), + "demo:pv:unaudted_i": SharedPV(nt=NTScalar("i"), initial=-1), +} + +print(pvs.keys()) +try: + Server.forever(providers=[pvs]) +except KeyboardInterrupt: + pass +finally: + # We need to close the auditor PV manually, the server stop() won't do it for us + auditor_pv.close() diff --git a/example/ntscalar_control.py b/example/ntscalar_control.py new file mode 100644 index 00000000..d785772f --- /dev/null +++ b/example/ntscalar_control.py @@ -0,0 +1,177 @@ +""" +A demonstration of using a handler to apply the Control field logic for an +Normative Type Scalar (NTScalar). + +There is only one PV, but it's behaviour is complex: +- try changing and checking the value, e.g. + `python -m p4p.client.cli put demo:pv=4` and + `python -m p4p.client.cli get demo:pv` +Initially the maximum = 11, minimum = -1, and minimum step size = 2. +Try varying the control settings, e.g. +- `python -m p4p.client.cli put demo:pv='{"value":5, "control.limitHigh":4}'` + `python -m p4p.client.cli get demo:pv` +Remove the comments at lines 166-169 and try again. + +This is also a demonstration of using the open(), put(), and post() callbacks +to implement this functionality, and particularly how it naturally partitions +the concerns of the three callback function: +- open() - logic based only on the input Value, +- post() - logic requiring comparison of cuurent and proposed Values +- put() - authorisation +""" + +from p4p.nt import NTScalar +from p4p.server import Server +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV +from p4p.wrapper import Value + + +class SimpleControl(Handler): + """ + A simple handler that implements the logic for the Control field of a + Normative Type. + """ + + def __init__(self): + # The attentive reader may wonder why we are keeping track of state here + # instead of relying on control.limitLow, control.limitHigh, and + # control.minStep. There are three possible reasons a developer might + # choose an implementation like this: + # - As [Ref1] shows it's not straightforward to maintain state using + # the PV's own fields + # - A developer may wish to have the limits apply as soon as the + # Channel is open. If an initial value is set then this may happen + # before the first post(). + # - It is possible to adapt this handler so it could be used without + # a Control field. + # The disadvantage of this simple approach is that clients cannot + # inspect the Control field values until they have been changed. + self._min_value = None # Minimum value allowed + self._max_value = None # Maximum value allowed + self._min_step = None # Minimum change allowed + + def open(self, value) -> bool: + """ + This function manages all logic when we only need to consider the + (proposed) future state of a PV + """ + value_changed_by_limit = False + + # Check if the limitHigh has changed. If it has then we have to + # reevaluate the existing value. Note that for this to work with a + # post() request we have to take the actions explained at Ref1 + if value.changed("control.limitHigh"): + self._max_value = value["control.limitHigh"] + if value["value"] > self._max_value: + value["value"] = self._max_value + value_changed_by_limit = True + + if value.changed("control.limitLow"): + self._min_value = value["control.limitLow"] + if value["value"] < self._min_value: + value["value"] = self._min_value + value_changed_by_limit = True + + # This has to go in the open because it could be set in the initial value + if value.changed("control.minStep"): + self._min_step = value["control.minStep"] + + # If the value has changed we need to check it against the limits and + # change it if any of the limits apply + if value.changed("value"): + if self._max_value and value["value"] > self._max_value: + value["value"] = self._max_value + value_changed_by_limit = True + elif self._min_value and value["value"] < self._min_value: + value["value"] = self._min_value + value_changed_by_limit = True + + return value_changed_by_limit + + def post(self, pv: SharedPV, value: Value): + """ + This function manages all logic when we need to know both the + current and (proposed) future state of a PV + """ + # [Ref1] This is where even our simple handler gets complex! + # If the value["value"] has not been changed as part of the post() + # operation then it will be set to a default value (i.e. 0) and + # marked unchanged. For the logic in open() to work if the control + # limits are changed we need to set the pv.current().raw value in + # this case. + if not value.changed("value"): + value["value"] = pv.current().raw["value"] + value.mark("value", False) + + # Apply the control limits before the check for minimum change because: + # - the self._min_step may be updated + # - the value["value"] may be altered by the limits + value_changed_by_limit = self.open(value) + + # If the value["value"] wasn't changed by the put()/post() but was + # changed by the limits then we don't check the min_step but + # immediately return + if value_changed_by_limit: + return + + if ( + self._min_step + and abs(pv.current().raw["value"] - value["value"]) < self._min_step + ): + value.mark("value", False) + + def put(self, pv, op): + """ + In most cases the combination of a put() and post() means that the + put() is solely concerned with issues of authorisation. + """ + # Demo authorisation. + # Only Alice may remotely change the Control limits + # Bob is forbidden from changing anything on this Channel + # Everyone else may change the value but not the Control limits + errmsg = None + if op.account() == "Alice": + pass + elif op.account() == "Bob": + op.done(error="Bob is forbidden to make changes!") + return + else: + if op.value().raw.changed("control"): + errmsg = f"Unauthorised attempt to set Control by {op.account()}" + op.value().raw.mark("control", False) + + # Because we have not set use_handler_post=False in the post this + # will automatically trigger evaluation of the post rule and thus + # the application of + pv.post(op.value()) + op.done(error=errmsg) + + +# Construct a PV with Control fields and use a handler to apply the Normative +# Type logic. Note that the Control logic is correctly applied even to the +# initial value, based on the limits set in the rest of the initial value. +pv = SharedPV( + nt=NTScalar("d", control=True), + handler=SimpleControl(), + initial={ + "value": 12.0, + "control.limitHigh": 11, + "control.limitLow": -1, + "control.minStep": 2, + }, # Immediately limited to 11 due to handler +) + + +# Override the put in the handler so that we can perform puts for testing +# @pv.on_put +# def handle(pv, op): +# pv.post(op.value()) # just store and update subscribers +# op.done() + + +pvs = { + "demo:pv": pv, +} +print("PVs: ", pvs) +Server.forever(providers=[pvs]) diff --git a/example/persist.py b/example/persist.py new file mode 100644 index 00000000..ec68d1fe --- /dev/null +++ b/example/persist.py @@ -0,0 +1,190 @@ +""" +Use a handler to automatically persist values to an SQLite3 file database. +Any values persisted this way will be automatically restored when the +program is rerun. The details of users (account name and IP address) are +recorded for puts. + +Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the +program while continuing to monitor the PV. Compare with the value of +`demo:pv:uptime` which resets on each program start. Try setting the value of +demo:pv:optime while continuing to monitor it. It is recommended to +inspect the persisted file, e.g. `sqlite3 persist_pvs.db "select * from pvs"`. + +There is an important caveat for this simple demo: +The `PersistHandler` will not work as expected if anything other than the +value of a field is changed, e.g. if a Control field was added to an NTScalar +if would not be persisted correctly. This could be resolved by correctly +merging the pv.current().raw and value.raw appropriately in the post(). +""" + +import json +import sqlite3 +import time + +from p4p import Value +from p4p.nt.scalar import NTScalar +from p4p.server import Server, ServerOperation +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class PersistHandler(Handler): + """ + A handler that will allow simple persistence of values and timestamps + across retarts. It requires a post handler in order to persist values + set within the program. + """ + + def __init__(self, pv_name: str, conn: sqlite3.Connection): + self._conn = conn + self._pv_name = pv_name + + def open(self, value, handler_open_restore=True): + # If there is a value already in the database we always use that + # instead of the supplied initial value, unless the + # handler_open_restore flag indicates otherwise. + if not handler_open_restore: + return + + # We could, in theory, re-apply authentication here if we queried for + # that information and then did something with it! + res = self._conn.execute("SELECT data FROM pvs WHERE id=?", [self._pv_name]) + query_val = res.fetchone() + + if query_val is not None: + json_val = json.loads(query_val[0]) + print(f"Will restore to {self._pv_name} value: {json_val['value']}") + + # Override initial value + value["value"] = json_val["value"] + + value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][ + "secondsPastEpoch" + ] + value["timeStamp.nanoseconds"] = json_val["timeStamp"]["nanoseconds"] + else: + # We are using an initial value so persist it + self._upsert(value) + + def post( + self, + pv: SharedPV, + value: Value, + handler_post_upsert_account=None, + handler_post_upsert_peer=None, + ): + self._update_timestamp(value) + + self._upsert( + value, + account=handler_post_upsert_account, + peer=handler_post_upsert_peer, + ) + + def put(self, pv: SharedPV, op: ServerOperation): + # The post does all the real work, we just add info only available + # from the ServerOperation + pv.post( + op.value(), + handler_post_upsert_account=op.account(), + handler_post_upsert_peer=op.peer(), + ) + op.done() + + def _update_timestamp(self, value) -> None: + if not value.changed("timeStamp") or ( + value["timeStamp.nanoseconds"] == value["timeStamp.nanoseconds"] == 0 + ): + now = time.time() + value["timeStamp.secondsPastEpoch"] = now // 1 + value["timeStamp.nanoseconds"] = int((now % 1) * 1e9) + + def _upsert(self, value, account=None, peer=None) -> None: + # Persist the data; turn into JSON and write it to the DB + val_json = json.dumps(value.todict()) + + # Use UPSERT: https://sqlite.org/lang_upsert.html + conn.execute( + """ + INSERT INTO pvs (id, data, account, peer) + VALUES (:name, :json_data, :account, :peer) + ON CONFLICT(id) + DO UPDATE SET data = :json_data, account = :account, peer = :peer; + """, + { + "name": self._pv_name, + "json_data": val_json, + "account": account, + "peer": peer, + }, + ) + conn.commit() + + +# Create an SQLite dayabase to function as our persistence store +conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) +# conn.execute("DROP TABLE IF EXISTS pvs") +conn.execute( + "CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));" +) # IPv6 addresses can be long and will contain port number as well! + +duplicate_pv = SharedPV( + nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12 +) +pvs = { + "demo:pv:optime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:optime", conn), + initial=0, + ), # Operational time; total time running + "demo:pv:uptime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:optime", conn), + timestamp=time.time(), + initial=0, + handler_open_restore=False, # Note that this option means it will always start at 0 + ), # Uptime since most recent (re)start + "demo:pv:int": duplicate_pv, + "demo:pv:float": SharedPV( + nt=NTScalar("d"), + handler=PersistHandler("demo:pv:float", conn), + initial=9.99, + ), + "demo:pv:string": SharedPV( + nt=NTScalar("s"), + handler=PersistHandler("demo:pv:string", conn), + initial="Hello!", + ), + "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore +} + + +# Make the uptime PV readonly; maybe we want to be able to update optime +# after major system upgrades? +uptime_pv = pvs["demo:pv:uptime"] + + +@uptime_pv.on_put +def read_only(pv: SharedPV, op: ServerOperation): + op.done(error="Read-only") + return + + +print(f"Starting server with the following PVs: {pvs}") + +server = None +try: + server = Server(providers=[pvs]) + while True: + # Every second increment the values of uptime and optime + time.sleep(1) + increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1 + pvs["demo:pv:uptime"].post(increment_value) + increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1 + pvs["demo:pv:optime"].post(increment_value) +except KeyboardInterrupt: + pass +finally: + if server: + server.stop() + conn.close() diff --git a/src/p4p/server/raw.py b/src/p4p/server/raw.py index 2629f780..1219af47 100644 --- a/src/p4p/server/raw.py +++ b/src/p4p/server/raw.py @@ -39,6 +39,12 @@ class Handler(object): Use of this as a base class is optional. """ + def open(self, value, **kws): + """ + Called each time an Open operation is performed on this Channel + + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + """ def put(self, pv, op): """ @@ -50,6 +56,17 @@ def put(self, pv, op): """ op.done(error='Not supported') + def post(self, pv, value, **kws): + """ + Called each time a client issues a post + operation on this Channel. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + :param dict options: A dictionary of configuration options. + """ + pass + def rpc(self, pv, op): """ Called each time a client issues a Remote Procedure Call @@ -76,6 +93,13 @@ def onLastDisconnect(self, pv): """ pass + def close(self, pv): + """ + Called when the Channel is closed. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + """ + pass class SharedPV(_SharedPV): @@ -154,10 +178,23 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws): self._wrap = wrap or (nt and nt.wrap) or self._wrap self._unwrap = unwrap or (nt and nt.unwrap) or self._unwrap + # Intercept all arguments that start with 'handler_open_' and remove them from + # the arguments that go to the wrap and send them instead to the handler.open() + post_kws = {x: kws.pop(x) for x in [y for y in kws if y.startswith("handler_open_")]} + try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.open(V, **post_kws) + except AttributeError as err: + pass + _SharedPV.open(self, V) def post(self, value, **kws): @@ -170,12 +207,43 @@ def post(self, value, **kws): Any keyword arguments are forwarded to the NT wrap() method (if applicable). Common arguments include: timestamp= , severity= , and message= . """ + # Intercept all arguments that start with 'handler_post_' and remove them from + # the arguments that go to the wrap and send them instead to the handler.post() + post_kws = {x: kws.pop(x) for x in [y for y in kws if y.startswith("handler_post_")]} + try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.post(self, V, **post_kws) + except AttributeError: + pass + _SharedPV.post(self, V) + def close(self, destroy=False, sync=False, timeout=None): + """Close PV, disconnecting any clients. + + :param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open(). + :param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies). + :param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value. + + close() with destory=True or sync=True will not prevent clients from re-connecting. + New clients may prevent sync=True from succeeding. + Prevent reconnection by __first__ stopping the Server, removing with :py:meth:`StaticProvider.remove()`, + or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV. + """ + try: + self._handler.close(self) + except AttributeError: + pass + + _SharedPV.close(self) + def current(self): V = _SharedPV.current(self) try: @@ -208,7 +276,15 @@ def __init__(self, pv, real): self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC self._real = real + def open(self, value, **kws): + _log.debug('OPEN %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.open, value, **kws) + except AttributeError: + pass + def onFirstConnect(self): + _log.debug('ONFIRSTCONNECT %s', self._pv) self._pv._exec(None, self._pv._onFirstConnect, None) try: # user handler may omit onFirstConnect() M = self._real.onFirstConnect @@ -217,6 +293,7 @@ def onFirstConnect(self): self._pv._exec(None, M, self._pv) def onLastDisconnect(self): + _log.debug('ONLASTDISCONNECT %s', self._pv) try: M = self._real.onLastDisconnect except AttributeError: @@ -239,33 +316,76 @@ def rpc(self, op): except AttributeError: op.done(error="RPC not supported") + def post(self, value, **kws): + _log.debug('POST %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.rpc, self._pv, value, **kws) + except AttributeError: + pass + + def close(self, pv): + _log.debug('CLOSE %s', self._pv) + try: + self._pv._exec(None, self._real.close, self._pv) + except AttributeError: + pass + @property - def onFirstConnect(self): + def on_open(self): + def decorate(fn): + self._handler.open = fn + return fn + return decorate + + @property + def on_first_connect(self): def decorate(fn): self._handler.onFirstConnect = fn return fn return decorate @property - def onLastDisconnect(self): + def on_last_disconnect(self): def decorate(fn): self._handler.onLastDisconnect = fn return fn return decorate @property - def put(self): + def on_put(self): def decorate(fn): self._handler.put = fn return fn return decorate @property - def rpc(self): + def on_rpc(self): def decorate(fn): self._handler.rpc = fn return fn return decorate + + @property + def on_post(self): + def decorate(fn): + self._handler.post = fn + return fn + return decorate + + @property + def on_close(self): + def decorate(fn): + self._handler.close = fn + return fn + return decorate + + # Aliases for decorators to maintain consistent new style + # Required because post is already used and on_post seemed the best + # alternative. + put = on_put + rpc = on_rpc + onFirstConnect = on_first_connect + onLastDisconnect = on_last_disconnect def __repr__(self): if self.isOpen(): diff --git a/src/p4p/test/test_sharedpv.py b/src/p4p/test/test_sharedpv.py index a810ba32..ea3cb071 100644 --- a/src/p4p/test/test_sharedpv.py +++ b/src/p4p/test/test_sharedpv.py @@ -3,6 +3,7 @@ import logging import unittest import random +from unittest.mock import patch import weakref import sys import gc @@ -17,6 +18,7 @@ from ..wrapper import Value, Type from ..client.thread import Context, Disconnected, TimeoutError, RemoteError from ..server import Server, StaticProvider +from ..server.raw import Handler from ..server.thread import SharedPV, _defaultWorkQueue from ..util import WorkQueue from ..nt import NTScalar, NTURI @@ -149,6 +151,359 @@ def testMonitor(self): gc.collect() self.assertIsNone(C()) +class TestGPMNewHandler(RefTestCase): + maxDiff = 1000 + timeout = 1.0 + + class Times3Handler(Handler): + # Note that the prototypes of open() and post() return None and here + # we have them returning bool. A more rigorous solution might use + # Exceptions instead, which would also allow an error message to be + # passed to the client via the op.done() + def open(self, value): + if value.changed('value'): + if value["value"] < 0: + value.unmark() + return False + value["value"] = value["value"] * 3 + + return True + + def post(self, pv, value, **kws): + if not kws.pop("handler_post_enable", True): + return + + return self.open(value) + + def put(self, pv, op): + if not self.post(pv, op.value().raw): + op.done(error="Must be non-negative") + pv.post(op.value(),handler_post_enable=False) + op.done() + + def setUp(self): + # gc.set_debug(gc.DEBUG_LEAK) + super(TestGPMNewHandler, self).setUp() + + self.pv = SharedPV(handler=self.Times3Handler(), nt=NTScalar('d')) + self.pv2 = SharedPV(handler=self.Times3Handler(), nt=NTScalar('d'), initial=42.0) + self.sprov = StaticProvider("serverend") + self.sprov.add('foo', self.pv) + self.sprov.add('bar', self.pv2) + + self.server = Server(providers=[self.sprov], isolate=True) + _log.debug('Server Conf: %s', self.server.conf()) + + def tearDown(self): + self.server.stop() + _defaultWorkQueue.sync() + #self.pv._handler._pv = None + R = [weakref.ref(r) for r in (self.server, self.sprov, self.pv, self.pv._whandler, self.pv._handler)] + r = None + del self.server + del self.sprov + del self.pv + del self.pv2 + gc.collect() + R = [r() for r in R] + self.assertListEqual(R, [None] * len(R)) + super(TestGPMNewHandler, self).tearDown() + + def testCurrent(self): + self.pv.open(1.0) + self.assertEqual(self.pv.current(), 3.0) + + def testGet(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + _log.debug('Client conf: %s', ctxt.conf()) + # PV not yet opened + self.assertRaises(TimeoutError, ctxt.get, 'foo', timeout=0.1) + + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 3.0) + self.assertTrue(V.raw.changed('value')) + + self.assertEqual(ctxt.get(['foo', 'bar']), [1.0 * 3, 42.0 * 3]) + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + def testPutGet(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 3.0) + + ctxt.put('foo', 5) + + V = ctxt.get('foo') + self.assertEqual(V, 15.0) + + ctxt.put(['foo', 'bar'], [5, 6]) + + self.assertEqual(ctxt.get(['foo', 'bar']), [5 * 3, 6 * 3]) + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + def testMonitor(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + + self.pv.open(1.0) + + Q = Queue(maxsize=4) + sub = ctxt.monitor('foo', Q.put, notify_disconnect=True) + + V = Q.get(timeout=self.timeout) + self.assertIsInstance(V, Disconnected) + + V = Q.get(timeout=self.timeout) + self.assertEqual(V, 3.0) + + ctxt.put('foo', 4) + + V = Q.get(timeout=self.timeout) + self.assertEqual(V, 12.0) + + self.pv.close() + + V = Q.get(timeout=self.timeout) + self.assertIsInstance(V, Disconnected) + + self.pv.open(3.0) + ctxt.hurryUp() + + V = Q.get(timeout=self.timeout) + self.assertEqual(V, 9.0) + + C = weakref.ref(ctxt) + del ctxt + del sub + del Q + gc.collect() + self.assertIsNone(C()) + + +class TestNewHandler(RefTestCase): + maxDiff = 1000 + timeout = 1.0 + + class ValueChangeHandler(Handler): + """Check whether calls work as expected""" + put_use_handler_post = True + + def open(self, value, **kwargs): + if not kwargs.pop("handler_open_enable", True): + return + + value["value"] = 1.1 + + def onFirstConnect(self, pv): + # Not intended to allow changes to pv? + pass + + def onLastDisconnect(self, pv): + # Not intended to allow changes to pv? + pass + + def post(self, pv, value, **kwargs): + if not kwargs.pop("handler_post_enable", True): + return + + value["value"] = 2.2 + + def put(self, pv, op): + op.value().raw["value"] = 3.3 + pv.post(op.value(), handler_post_enable=self.put_use_handler_post) + op.done() + + def close(self, pv): + # Not intended to allow changes to pv? + pass + + def setUp(self): + # gc.set_debug(gc.DEBUG_LEAK) + super(TestNewHandler, self).setUp() + + self.pv = SharedPV(nt=NTScalar('d'), handler=self.ValueChangeHandler()) + self.pv2 = SharedPV(nt=NTScalar('d'), initial=42.0) # No handler + self.sprov = StaticProvider("serverend") + self.sprov.add('foo', self.pv) + self.sprov.add('bar', self.pv2) + + self.server = Server(providers=[self.sprov], isolate=True) + _log.debug('Server Conf: %s', self.server.conf()) + + def tearDown(self): + self.server.stop() + _defaultWorkQueue.sync() + #self.pv._handler._pv = None + R = [weakref.ref(r) for r in (self.server, self.sprov, self.pv, self.pv._whandler, self.pv._handler)] + r = None + del self.server + del self.sprov + del self.pv + del self.pv2 + gc.collect() + R = [r() for r in R] + self.assertListEqual(R, [None] * len(R)) + super(TestNewHandler, self).tearDown() + + def test_open(self): + # Note that the mock.patch changes the ValueChangeHandler.open() into a noop + with patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.open') as mock_open: + # Test handler.open() calls processed correctly + self.pv.open(1.0) + self.assertEqual(self.pv.current(), 1.0) + mock_open.assert_called_once() + mock_open.reset_mock() + + # Test handler.open() calls processed correctly after close() + self.pv.close() + self.pv.open(2.0) + self.assertEqual(self.pv.current(), 2.0) + mock_open.assert_called_once() + mock_open.reset_mock() + + # Test nothing goes wrong when we have no handler set in the SharedPV + self.pv2.close() + self.pv2.open(1.0) + self.assertEqual(self.pv2.current(), 1.0) + mock_open.assert_not_called() + mock_open.reset_mock() + + # Check that value changes in a handler happen correctly + self.pv.close() + self.pv.open(55.0) + self.assertEqual(self.pv.current(), 1.1) + + # Check that handler_open_ arguments are passed correctly + self.pv.close() + self.pv.open(33.0, handler_open_enable=False) + self.assertEqual(self.pv.current(), 33.0) + + def test_post(self): + # Note that the mock.patch changes the ValueChangeHandler.post() into a noop + with patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.post') as mock_post: + # Test handler.open() calls processed correctly; again patch means our function isn't called + self.pv.open(1.0) + self.pv.post(5.0) + mock_post.assert_called_once() + self.assertEqual(self.pv.current(), 5.0) + mock_post.reset_mock() + + # Test nothing goes wrong when we have no handler set in the SharedPV + self.pv2.post(6.0) + mock_post.assert_not_called() + self.assertEqual(self.pv2.current(), 6.0) + mock_post.reset_mock() + + # Check that value changes in a handler happen correctly + self.pv.close() + self.pv.open(1.0) + self.pv.post(9.9) + self.assertEqual(self.pv.current(), 2.2) + + # Check that handler_post_ arguments are passed correctly + self.pv.close() + self.pv.open(1.0) + self.pv.post(77.0, handler_post_enable=False) + self.assertEqual(self.pv.current(), 77.0) + + def test_get(self): + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + _log.debug('Client conf: %s', ctxt.conf()) + # PV not yet opened + self.assertRaises(TimeoutError, ctxt.get, 'foo', timeout=0.1) + + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + self.assertTrue(V.raw.changed('value')) + + self.assertEqual(ctxt.get(['foo', 'bar']), [1.1, 42.0]) + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + def test_put_get(self): + # Test handler.put() is called as expected and nothing breaks if it is absent + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt, \ + patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.put') as mock_put: + + # Check the handler.put() is called + self.pv.open(1.0) + + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + + # This will timeout as the patch has rendered the handler.post() a noop + # and so we'll never issue an op.done() but we can still check it was called + self.assertRaises(TimeoutError, ctxt.put, 'foo', 5, timeout=0.1) + mock_put.assert_called_once() + mock_put.reset_mock() + + # Check that the new code does not affect the operation of a SharedPV with no handler + # This will also timeout as a PV without handler doesn't allow puts! + V = ctxt.get('bar') + self.assertEqual(V, 42.0) + + self.assertRaises(RemoteError, ctxt.put, 'bar', 5, timeout=0.1) + mock_put.assert_not_called() + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + # Test interaction with handler.post() + self.pv.close() + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt, \ + patch('p4p.test.test_sharedpv.TestNewHandler.ValueChangeHandler.post') as mock_post: + + self.pv.open(1.0) + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + + ctxt.put('foo', 15) + + self.assertEqual(ctxt.get('foo'), 3.3) + mock_post.assert_called_once() + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + + # Test interaction with handler.post() without any patches to interfere + self.pv.close() + with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: + + self.pv.open(1.0) + V = ctxt.get('foo') + self.assertEqual(V, 1.1) + + ctxt.put('foo', 15) + + self.assertEqual(ctxt.get('foo'), 2.2) + mock_post.assert_called_once() + + C = weakref.ref(ctxt) + del ctxt + gc.collect() + self.assertIsNone(C()) + class TestRPC(RefTestCase): maxDiff = 1000 timeout = 1.0 @@ -235,6 +590,7 @@ def test_rpc_error(self): with self.assertRaisesRegex(RemoteError, 'oops'): ret = C.rpc('foo', args.wrap('foo', kws={'oops':True})) + class TestRPC2(TestRPC): openclose = True