Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional callbacks for SharedPV handlers #155

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
74 changes: 74 additions & 0 deletions example/auditor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
In this example we setup a simple auditing mechanism that reports information
about the last channel changed (which channel, when, and who by). Since we
might need to know this information even when the program is not running we
persist this data to file, including information about when changes could
have been made.
"""

import time

from p4p.nt.scalar import NTScalar
from p4p.server import Server
from p4p.server.raw import Handler
from p4p.server.thread import SharedPV


class Auditor(Handler):
"""Persist information to file so we can audit when the program is closed"""

def open(self, value):
with open("audit.log", mode="a+") as f:
f.write(f"Auditing opened at {time.ctime()}\n")

def close(self, pv):
with open("audit.log", mode="a+") as f:
value = pv.current().raw["value"]
if value:
f.write(f"Auditing closed at {time.ctime()}; {value}\n")
else:
f.write(f"Auditing closed at {time.ctime()}; no changes made\n")


class Audited(Handler):
"""Forward information about Put operations to the auditing PV"""

def __init__(self, pv: SharedPV):
self._audit_pv = pv

def put(self, pv, op):
pv.post(op.value())
self._audit_pv.post(
f"Channel {op.name()} last updated by {op.account()} at {time.ctime()}"
)
op.done()


# Setup the PV that will make the audit information available.
# Note that there is no put in its handler so it will be externally read-only
auditor_pv = SharedPV(nt=NTScalar("s"), handler=Auditor(), initial="")

# Setup some PVs that will be audited and one that won't be
# Note that the audited handler does have a put so these PVs can be changed externally
pvs = {
"demo:pv:auditor": auditor_pv,
"demo:pv:audited_d": SharedPV(
nt=NTScalar("d"), handler=Audited(auditor_pv), initial=9.99
),
"demo:pv:audited_i": SharedPV(
nt=NTScalar("i"), handler=Audited(auditor_pv), initial=4
),
"demo:pv:audited_s": SharedPV(
nt=NTScalar("s"), handler=Audited(auditor_pv), initial="Testing"
),
"demo:pv:unaudted_i": SharedPV(nt=NTScalar("i"), initial=-1),
}

print(pvs.keys())
try:
Server.forever(providers=[pvs])
except KeyboardInterrupt:
pass
finally:
# We need to close the auditor PV manually, the server stop() won't do it for us
auditor_pv.close()
143 changes: 143 additions & 0 deletions example/ntscalar_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from p4p.nt import NTScalar
from p4p.server import Server
from p4p.server.raw import Handler
from p4p.server.thread import SharedPV
from p4p.wrapper import Type, Value


class SimpleControl(Handler):
"""
A simple handler that implements the logic for the Control field of a
Normative Type.
"""

def __init__(self, min_value=None, max_value=None, min_step=None):
# The attentive reader may wonder why we are keeping track of state here
# instead of relying on control.limitLow, control.limitHigh, and
# control.minStep. There are three possible reasons a developer might
# choose an implementation like this:
# - As [Ref1] shows it's not straightforward to maintain state using
# the PV's own fields
# - A developer may wish to have the limits apply as soon as the
# Channel is open. If an initial value is set then this may happen
# before the first post().
# - It is possible to adapt this handler so it could be used without
# a Control field.
# The disadvantage of this simple approach is that clients cannot
# inspect the Control field values until they have been changed.
self._min_value = min_value # Minimum value allowed
self._max_value = max_value # Maximum value allowed
self._min_step = min_step # Minimum change allowed

def open(self, value):
"""
This function manages all logic when we only need to consider the
(proposed) future state of a PV
"""
# Check if the limitHigh has changed. If it has then we have to reevaluate
# the existing value. Note that for this to work with a post request we
# have to take the actions explained at Ref1
if value.changed("control.limitHigh"):
self._max_value = value["control.limitHigh"]
if value["value"] > self._max_value:
value["value"] = self._max_value

if value.changed("control.limitLow"):
self._min_value = value["control.limitLow"]
if value["value"] < self._min_value:
value["value"] = self._min_value

# If the value has changed we need to check it against the limits and
# change it if any of the limits apply
if value.changed("value"):
if self._max_value and value["value"] > self._max_value:
value["value"] = self._max_value
if self._min_value and value["value"] < self._min_value:
value["value"] = self._min_value

def post(self, pv, value):
"""
This function manages all logic when we need to know both the
current and (proposed) future state of a PV
"""
# If the minStep has changed update this instance's minStemp value
if value.changed("control.minStep"):
self._min_change = value["control.minStep"]

# [Ref1] This is where even our simple handler gets complex!
# If the value["value"] has not been changed as part of the post()
# operation then it will be set to a default value (i.e. 0) and
# marked unchanged.
current_value = pv.current().raw
value_changed = True # TODO: Explain this
if not value.changed("value"):
value["value"] = current_value["value"]
value.mark("value", False)
value_changed = False

# Apply the control limits before the check for minimum change as the
# value may be altered by the limits.
self.open(value)
if not value_changed and value.changed("value"):
return

if abs(current_value["value"] - value["value"]) < self._min_step:
value.mark("value", False)

def put(self, pv, op):
"""
In most cases the combination of a put() and post() means that the
put() is solely concerned with issues of authorisation.
"""
# Demo authorisation.
# Only Alice may remotely change the Control limits
# Bob is forbidden from changing anything on this Channel
# Everyone else may change the value but not the Control limits
errmsg = None
if op.account() == "Alice":
pass
elif op.account() == "Bob":
op.done(error="Bob is forbidden to make changes!")
return
else:
if op.value().raw.changed("control"):
errmsg = (
f"Unauthorised attempt to set Control by {op.account()}"
)
op.value().raw.mark("control", False)

# Because we have not set use_handler_post=False in the post this
# will automatically trigger evaluation of the post rule and thus
# the application of
pv.post(op.value())
op.done(error=errmsg)


# Construct PV with control and structures
# and then set the values of some of those values with a post
pv = SharedPV(
nt=NTScalar("d", control=True),
handler=SimpleControl(-1, 11, 2),
initial=12.0, # Immediately limited to 11 due to handler on live above
)
pv.post(
{
"control.limitHigh": 6, # Value now limited to 6
}
)


@pv.on_put
def handle(pv, op):
pv.post(op.value()) # just store and update subscribers
op.done()


print("demo:pv:name: ", pv)
Server.forever(
providers=[
{
"demo:pv:name": pv,
}
]
)
173 changes: 173 additions & 0 deletions example/persist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Use a handler to automatically persist values to an SQLite3 file database.
Any values persisted this way will be automatically restored when the
program is rerun. The details of users who

Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the
program while continuing to monitor the PV. Compare with the value of
`demo:pv:uptime` which resets on each program start. Try setting the value of
demo:pv:optime while continuing to monitor it. It is recommended to
inspect the persisted file, e.g. `sqlite3 persist_pvs.db "select * from pvs"`.

There is an important caveat for this simple demo:
The `PersistHandler` will not work as expected if anything other than the
value of a field is changed, e.g. if a Control field was added to an NTScalar
if would not be persisted correctly. This could be resolved by correctly
merging the pv.current().raw and value.raw appropriately in the post()
and put().
"""

import json
import sqlite3
import time

from p4p import Value
from p4p.nt.scalar import NTScalar
from p4p.server import Server, ServerOperation
from p4p.server.raw import Handler
from p4p.server.thread import SharedPV


class PersistHandler(Handler):
"""
A handler that will allow simple persistence of values and timestamps
across retarts. It requires a post handler in order to persist values
set within the program.
"""

def __init__(self, pv_name: str, conn: sqlite3.Connection):
self._conn = conn
self._pv_name = pv_name

def open(self, value, **kws):
# If there is a value already in the database we always use that
# instead of the supplied initial value, unless the
# handler_open_restore flag indicates otherwise.
if not kws.pop("handler_open_restore", True):
return

res = self._conn.execute(
"SELECT data FROM pvs WHERE id=?", [self._pv_name]
)
query_val = res.fetchone()

if query_val is not None:
# We could, in theory, re-apply authentication here based on the
# account and peer information.
json_val = json.loads(query_val[0])
print(f"Will restore to {self._pv_name} value: {json_val['value']}")

# Override initial value
value["value"] = json_val["value"]

value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][
"secondsPastEpoch"
]
value["timeStamp.nanoseconds"] = json_val["timeStamp"][
"nanoseconds"
]
else:
self._upsert(value)

def _update_timestamp(self, value):
if not value.changed("timeStamp"):
now = time.time()
value["timeStamp.secondsPastEpoch"] = now // 1
value["timeStamp.nanoseconds"] = int((now % 1) * 1e9)

def _upsert(self, value, account=None, peer=None) -> None:
# Persist the data
val_json = json.dumps(value.todict())

# Use UPSERT: https://sqlite.org/lang_upsert.html
conn.execute(
"""
INSERT INTO pvs (id, data, account, peer)
VALUES (:name, :json_data, :account, :peer)
ON CONFLICT(id)
DO UPDATE SET data = :json_data, account = :account, peer = :peer;
""",
{
"name": self._pv_name,
"json_data": val_json,
"account": account,
"peer": peer,
},
)
conn.commit()

def post(self, pv: SharedPV, value: Value, **kwargs):
self._update_timestamp(value)

self._upsert(
value,
account=kwargs.pop("handler_post_upsert_account", None),
peer=kwargs.pop("handler_post_upsert_peer", None),
)

def put(self, pv: SharedPV, op: ServerOperation):
value = op.value().raw

# The post takes care of updating the timestamp
pv.post(
op.value(),
handler_post_upsert_account=op.account(),
handler_post_upsert_peer=op.peer(),
)
op.done()


# Create an SQLite dayabase to function as our persistence store
conn = sqlite3.connect("persist_pvs.db", check_same_thread=False)
# conn.execute("DROP TABLE IF EXISTS pvs")
conn.execute(
"CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));"
) # IPv6 addresses can be long and will contain port number as well!

duplicate_pv = SharedPV(
nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12
)
pvs = {
"demo:pv:optime": SharedPV(
nt=NTScalar("i"),
handler=PersistHandler("demo:pv:optime", conn),
initial=0,
), # Operational time; total time running
"demo:pv:uptime": SharedPV(
nt=NTScalar("i"),
handler=PersistHandler("demo:pv:optime", conn),
timestamp=time.time(),
initial=0,
handler_open_restore=False,
), # Uptime since most recent (re)start
"demo:pv:int": duplicate_pv,
"demo:pv:float": SharedPV(
nt=NTScalar("d"),
handler=PersistHandler("demo:pv:float", conn),
initial=9.99,
),
"demo:pv:string": SharedPV(
nt=NTScalar("s"),
handler=PersistHandler("demo:pv:string", conn),
initial="Hello!",
),
"demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore
}

print(f"Starting server with the following PVs: {pvs}")

server = None
try:
server = Server(providers=[pvs])
while True:
time.sleep(1)
increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1
pvs["demo:pv:uptime"].post(increment_value)
increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1
pvs["demo:pv:optime"].post(increment_value)
except KeyboardInterrupt:
pass
finally:
if server:
server.stop()
conn.close()
Loading