diff --git a/.buildinfo b/.buildinfo new file mode 100644 index 00000000..f4b8801b --- /dev/null +++ b/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: 4c0ac0660cdd42b8acbddd695a2e685a +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 00000000..e69de29b diff --git a/_images/graphviz-019d1949f1c9a290f30e505fcdce056c16aeb472.png b/_images/graphviz-019d1949f1c9a290f30e505fcdce056c16aeb472.png new file mode 100644 index 00000000..e8f9d8c8 Binary files /dev/null and b/_images/graphviz-019d1949f1c9a290f30e505fcdce056c16aeb472.png differ diff --git a/_images/graphviz-019d1949f1c9a290f30e505fcdce056c16aeb472.png.map b/_images/graphviz-019d1949f1c9a290f30e505fcdce056c16aeb472.png.map new file mode 100644 index 00000000..1efeb65c --- /dev/null +++ b/_images/graphviz-019d1949f1c9a290f30e505fcdce056c16aeb472.png.map @@ -0,0 +1,2 @@ + diff --git a/_images/graphviz-22af22e7c09fd95f8bc28d79fa6666749aacf8ce.png b/_images/graphviz-22af22e7c09fd95f8bc28d79fa6666749aacf8ce.png new file mode 100644 index 00000000..4fbf8cc4 Binary files /dev/null and b/_images/graphviz-22af22e7c09fd95f8bc28d79fa6666749aacf8ce.png differ diff --git a/_images/graphviz-22af22e7c09fd95f8bc28d79fa6666749aacf8ce.png.map b/_images/graphviz-22af22e7c09fd95f8bc28d79fa6666749aacf8ce.png.map new file mode 100644 index 00000000..4f7a46b5 --- /dev/null +++ b/_images/graphviz-22af22e7c09fd95f8bc28d79fa6666749aacf8ce.png.map @@ -0,0 +1,2 @@ + diff --git a/_images/graphviz-4dcebfccd2ce68d5093108f3f5eab93483966240.png b/_images/graphviz-4dcebfccd2ce68d5093108f3f5eab93483966240.png new file mode 100644 index 00000000..fd09362c Binary files /dev/null and b/_images/graphviz-4dcebfccd2ce68d5093108f3f5eab93483966240.png differ diff --git a/_images/graphviz-4dcebfccd2ce68d5093108f3f5eab93483966240.png.map b/_images/graphviz-4dcebfccd2ce68d5093108f3f5eab93483966240.png.map new file mode 100644 index 00000000..26adf723 --- /dev/null +++ b/_images/graphviz-4dcebfccd2ce68d5093108f3f5eab93483966240.png.map @@ -0,0 +1,2 @@ + diff --git a/_images/graphviz-b66706818f80f09045248e99cee994119fb0a213.png b/_images/graphviz-b66706818f80f09045248e99cee994119fb0a213.png new file mode 100644 index 00000000..aa5c9160 Binary files /dev/null and b/_images/graphviz-b66706818f80f09045248e99cee994119fb0a213.png differ diff --git a/_images/graphviz-b66706818f80f09045248e99cee994119fb0a213.png.map b/_images/graphviz-b66706818f80f09045248e99cee994119fb0a213.png.map new file mode 100644 index 00000000..2c22b3de --- /dev/null +++ b/_images/graphviz-b66706818f80f09045248e99cee994119fb0a213.png.map @@ -0,0 +1,2 @@ + diff --git a/_modules/index.html b/_modules/index.html new file mode 100644 index 00000000..445b488b --- /dev/null +++ b/_modules/index.html @@ -0,0 +1,125 @@ + + + + +
+ + +
+
+from __future__ import print_function
+
+import logging
+_log = logging.getLogger(__name__)
+
+import warnings
+import sys
+
+try:
+ from Queue import Queue, Full, Empty
+except ImportError:
+ from queue import Queue, Full, Empty
+
+from .. import _p4p
+from .._p4p import Cancelled, Disconnected, Finished, RemoteError
+
+from ..wrapper import Value, Type
+from ..nt import buildNT
+
+if sys.version_info >= (3, 0):
+ unicode = str
+
+__all__ = (
+ 'Subscription',
+ 'Context',
+ 'RemoteError',
+)
+
+
+def unwrapHandler(handler, nt):
+ """Wrap get/rpc handler to unwrap Value
+ """
+ def dounwrap(code, msg, val, handler=handler):
+ _log.debug("Handler (%s, %s, %r) -> %s", code, msg, val, handler)
+ try:
+ if code == 0:
+ handler(RemoteError(msg))
+ elif code == 1:
+ handler(Cancelled())
+ elif code == 2: # exception during builder callback
+ A, B, C = val
+ if unicode is str:
+ E = A(B).with_traceback(C) # py 3
+ else:
+ E = A(B) # py 2 (bye bye traceback...)
+ handler(E)
+ else:
+ if val is not None:
+ val = nt.unwrap(val)
+ handler(val)
+ except:
+ _log.exception("Exception in Operation handler")
+ return dounwrap
+
+
+def monHandler(handler):
+ def cb(handler=handler):
+ _log.debug("Update %s", handler)
+ try:
+ handler()
+ except:
+ _log.exception("Exception in Monitor handler")
+ return cb
+
+
+def defaultBuilder(value, nt):
+ """Reasonably sensible default handling of put builder
+ """
+ if callable(value):
+ return value
+
+ def builder(V):
+ if isinstance(value, Value):
+ V[None] = value
+ elif isinstance(value, dict):
+ for k, v in value.items():
+ V[k] = v
+ else:
+ nt.assign(V, value)
+ return builder
+
+
+def wrapRequest(request):
+ if request is None or isinstance(request, Value):
+ return request
+ return Context.makeRequest(request)
+
+
+[docs]class Subscription(_p4p.ClientMonitor):
+
+ """Interface to monitor subscription FIFO
+
+ Use method poll() to try to pop an item from the FIFO.
+ None indicates the FIFO is empty, must wait for another Data event before
+ calling poll() again.
+
+ complete()==True after poll()==False indicates that no more updates will
+ ever be forthcoming. This is normal (not error) completion.
+
+ cancel() aborts the subscription.
+ """
+
+ def __init__(self, context, name, nt, **kws):
+ _log.debug("Subscription(%s)", kws)
+ super(Subscription, self).__init__(context, name, **kws)
+ self.context = context
+ self._nt = nt
+ self.done = False
+
+ def pop(self):
+ val = super(Subscription, self).pop()
+ assert val is None or isinstance(val, (Value, Exception)), val
+ if isinstance(val, Value):
+ val = self._nt.unwrap(val)
+ elif isinstance(val, Finished):
+ self.done = True
+ _log.debug("poll() -> %r", val)
+ return val
+
+ def complete(self):
+ return self.done
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, A, B, C):
+ self.close()
+
+ if unicode is str:
+ def __del__(self):
+ self.close()
+
+[docs]class Context(object):
+
+ """
+ :param str provider: A Provider name. Try "pva" or run :py:meth:`Context.providers` for a complete list.
+ :param conf dict: Configuration to pass to provider. Depends on provider selected.
+ :param bool useenv: Allow the provider to use configuration from the process environment.
+ :param dict nt: Controls :ref:`unwrap`. None uses defaults. Set False to disable
+ :param dict unwrap: Legacy :ref:`unwrap`.
+ """
+
+ def __init__(self, provider='pva', conf=None, useenv=None,
+ unwrap=None, nt=None,
+ **kws):
+ self.name = provider
+ super(Context, self).__init__(**kws)
+
+ self._nt = buildNT(nt, unwrap)
+
+ self._ctxt = None
+
+ self._ctxt = _ClientProvider(provider, conf=conf, useenv=useenv)
+ self.conf = self._ctxt.conf
+ self.hurryUp = self._ctxt.hurryUp
+
+ makeRequest = _p4p.ClientProvider.makeRequest
+
+ def close(self):
+ if self._ctxt is None:
+ return
+
+ self._ctxt.close()
+ self._ctxt = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, A, B, C):
+ self.close()
+
+[docs] def disconnect(self, name=None):
+ """Clear internal Channel cache, allowing currently unused channels to be implictly closed.
+
+ :param str name: None, to clear the entire cache, or a name string to clear only a certain entry.
+ """
+ self._ctxt.disconnect(name)
+
+ def _request(self, process=None, wait=None):
+ """helper for building pvRequests
+
+ :param str process: Control remote processing. May be 'true', 'false', 'passive', or None.
+ :param bool wait: Wait for all server processing to complete.
+ """
+ opts = []
+ if process is not None:
+ opts.append('process=%s' % process)
+ if wait is not None:
+ if wait:
+ opts.append('wait=true')
+ else:
+ opts.append('wait=false')
+ return 'field()record[%s]' % (','.join(opts))
+
+[docs] def get(self, name, handler, request=None):
+ """Begin Fetch of current value of a PV
+
+ :param name: A single name string or list of name strings
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+ :param callable handler: Completion notification. Called with a Value, RemoteError, or Cancelled
+
+ :returns: A object with a method cancel() which may be used to abort the operation.
+ """
+ return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt),
+ pvRequest=wrapRequest(request), get=True, put=False)
+
+[docs] def put(self, name, handler, builder=None, request=None, get=True):
+ """Write a new value to a PV.
+
+ :param name: A single name string or list of name strings
+ :param callable handler: Completion notification. Called with None (success), RemoteError, or Cancelled
+ :param callable builder: Called when the PV Put type is known. A builder is responsible
+ for filling in the Value to be sent. builder(value)
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+ :param bool get: Whether to do a Get before the Put. If True then the value passed to the builder callable
+ will be initialized with recent PV values. eg. use this with NTEnum to find the enumeration list.
+
+ :returns: A object with a method cancel() which may be used to abort the operation.
+ """
+ return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt),
+ builder=defaultBuilder(builder, self._nt),
+ pvRequest=wrapRequest(request), get=get, put=True)
+
+[docs] def rpc(self, name, handler, value, request=None):
+ """Perform RPC operation on PV
+
+ :param name: A single name string or list of name strings
+ :param callable handler: Completion notification. Called with a Value, RemoteError, or Cancelled
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+
+ :returns: A object with a method cancel() which may be used to abort the operation.
+ """
+ if value is None:
+ value = Value(Type([]))
+ return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt),
+ value=value, pvRequest=wrapRequest(request), rpc=True)
+
+[docs] def monitor(self, name, handler, request=None, **kws):
+ """Begin subscription to named PV
+
+ :param str name: PV name string
+ :param callable handler: Completion notification. Called with None (FIFO not empty), RemoteError, Cancelled, or Disconnected
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+ :param bool notify_disconnect: Whether disconnect (and done) notifications are delivered to the callback (as None).
+
+ :returns: A Subscription
+ """
+ return Subscription(self._ctxt, name,
+ nt=self._nt,
+ handler=monHandler(handler), pvRequest=wrapRequest(request),
+ **kws)
+
+
+
+
+
+set_debug = _p4p.logger_level_set
+
+def _cleanup_contexts():
+ contexts = list(_p4p.all_providers)
+ _log.debug("Closing %d Client contexts", len(contexts))
+ for ctxt in contexts:
+ ctxt.close()
+
+class _ClientOperation(_p4p.ClientOperation):
+ if unicode is str:
+ def __del__(self):
+ self.close()
+
+class _ClientProvider(_p4p.ClientProvider):
+ if unicode is str:
+ def __del__(self):
+ self.close()
+
+
+from __future__ import print_function
+
+import logging
+import sys
+_log = logging.getLogger(__name__)
+
+try:
+ from itertools import izip
+except ImportError:
+ izip = zip
+from functools import partial
+import json
+import threading
+
+try:
+ from Queue import Queue, Full, Empty
+except ImportError:
+ from queue import Queue, Full, Empty
+
+from . import raw
+from .raw import Disconnected, RemoteError, Cancelled, Finished
+from ..util import _defaultWorkQueue
+from ..wrapper import Value, Type
+from ..rpc import WorkQueue
+from .._p4p import (logLevelAll, logLevelTrace, logLevelDebug,
+ logLevelInfo, logLevelWarn, logLevelError,
+ logLevelFatal, logLevelOff)
+
+__all__ = [
+ 'Context',
+ 'Value',
+ 'Type',
+ 'RemoteError',
+ 'TimeoutError',
+]
+
+if sys.version_info >= (3, 0):
+ unicode = str
+ TimeoutError = TimeoutError
+
+else:
+ class TimeoutError(RuntimeError):
+ "Local timeout has expired"
+ def __init__(self):
+ RuntimeError.__init__(self, 'Timeout')
+
+
+[docs]class Subscription(object):
+ """An active subscription.
+
+ Returned by `Context.monitor`.
+ """
+
+ def __init__(self, ctxt, name, cb, notify_disconnect=False, queue=None):
+ self.name, self._S, self._cb = name, None, cb
+ self._notify_disconnect = notify_disconnect
+ self._Q = queue or ctxt._Q or _defaultWorkQueue()
+ if notify_disconnect:
+ # all subscriptions are inittially disconnected
+ self._Q.push_wait(partial(cb, Disconnected()))
+
+[docs] def close(self):
+ """Close subscription.
+ """
+ if self._S is not None:
+ # after .close() self._event should never be called
+ self._S.close()
+ self._S = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, A, B, C):
+ self.close()
+
+ @property
+ def done(self):
+ 'Has all data for this subscription been received?'
+ return self._S is None or self._S.done()
+
+ @property
+ def empty(self):
+ 'Is data pending in event queue?'
+ return self._S is None or self._S.empty()
+
+ def _event(self):
+ try:
+ assert self._S is not None, self._S
+ _log.debug('Subscription wakeup for %s', self.name)
+ self._Q.push(self._handle)
+ except:
+ _log.exception("Lost Subscription update for %s", self.name)
+
+ def _handle(self):
+ try:
+ S = self._S
+ if S is None: # already close()'d
+ return
+
+ for n in range(4):
+ E = S.pop()
+ if E is None:
+ break # monitor queue empty
+
+ elif isinstance(E, Exception):
+ _log.debug('Subscription notify for %s with %s', self.name, E)
+ if self._notify_disconnect:
+ self._cb(E)
+
+ elif isinstance(E, RemoteError):
+ _log.error("Subscription Error %s", E)
+
+ if isinstance(E, Finished):
+ _log.debug('Subscription complete %s', self.name)
+ self._S = None
+ S.close()
+
+ else:
+ self._cb(E)
+
+ if E is not None:
+ # removed 4 elements without emptying queue
+ # re-schedule to mux with others
+ self._Q.push(self._handle)
+
+ except:
+ _log.exception("Error processing Subscription event for %s", self.name)
+ if self._S is not None:
+ self._S.close()
+ self._S = None
+
+
+[docs]class Context(raw.Context):
+
+ """Context(provider, conf=None, useenv=True)
+
+ :param str provider: A Provider name. Try "pva" or run :py:meth:`Context.providers` for a complete list.
+ :param dict conf: Configuration to pass to provider. Depends on provider selected.
+ :param bool useenv: Allow the provider to use configuration from the process environment.
+ :param int workers: Size of thread pool in which monitor callbacks are run. Default is 4
+ :param int maxsize: Size of internal work queue used for monitor callbacks. Default is unlimited
+ :param dict nt: Controls :ref:`unwrap`. None uses defaults. Set False to disable
+ :param dict unwrap: Legacy :ref:`unwrap`.
+ :param WorkQueue queue: A work queue through which monitor callbacks are dispatched.
+
+ The methods of this Context will block the calling thread until completion or timeout
+
+ The meaning, and allowed keys, of the configuration dictionary depend on the provider.
+ conf= will override values taken from the process environment. Pass useenv=False to
+ ensure that environment variables are completely ignored.
+
+ The "pva" provider understands the following keys:
+
+ * EPICS_PVA_ADDR_LIST
+ * EPICS_PVA_AUTO_ADDR_LIST
+ * EPICS_PVA_SERVER_PORT
+ * EPICS_PVA_BROADCAST_PORT
+ """
+ Value = Value
+
+ name = ''
+ "Provider name string"
+
+ def __init__(self, provider='pva', conf=None, useenv=True, nt=None, unwrap=None,
+ maxsize=0, queue=None):
+ self._channel_lock = threading.Lock()
+
+ super(Context, self).__init__(provider, conf=conf, useenv=useenv, nt=nt, unwrap=unwrap)
+
+ # lazy start threaded WorkQueue
+ self._Q = self._T = None
+
+ self._Q = queue
+
+ def _channel(self, name):
+ with self._channel_lock:
+ return super(Context, self)._channel(name)
+
+[docs] def disconnect(self, *args, **kws):
+ with self._channel_lock:
+ super(Context, self).disconnect(*args, **kws)
+
+ def _queue(self):
+ if self._Q is None:
+ Q = WorkQueue(maxsize=self._Qmax)
+ Ts = []
+ for n in range(self._Wcnt):
+ T = threading.Thread(name='p4p Context worker', target=Q.handle)
+ T.daemon = True
+ Ts.append(T)
+ for T in Ts:
+ T.start()
+ _log.debug('Started %d Context worker', self._Wcnt)
+ self._Q, self._T = Q, Ts
+ return self._Q
+
+[docs] def close(self):
+ """Force close all Channels and cancel all Operations
+ """
+ if self._Q is not None:
+ for T in self._T:
+ self._Q.interrupt()
+ for n, T in enumerate(self._T):
+ _log.debug('Join Context worker %d', n)
+ T.join()
+ _log.debug('Joined Context workers')
+ self._Q, self._T = None, None
+ if not Context:
+ # Python 2.7 GC removes Context from scope during destruction of objects.
+ return
+ super(Context, self).close()
+
+[docs] def get(self, name, request=None, timeout=5.0, throw=True):
+ """Fetch current value of some number of PVs.
+
+ :param name: A single name string or list of name strings
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+ :param float timeout: Operation timeout in seconds
+ :param bool throw: When true, operation error throws an exception. If False then the Exception is returned instead of the Value
+
+ :returns: A p4p.Value or Exception, or list of same. Subject to :py:ref:`unwrap`.
+
+ When invoked with a single name then returns is a single value.
+ When invoked with a list of name, then returns a list of values
+
+ >>> ctxt = Context('pva')
+ >>> V = ctxt.get('pv:name')
+ >>> A, B = ctxt.get(['pv:1', 'pv:2'])
+ >>>
+ """
+ singlepv = isinstance(name, (bytes, unicode))
+ if singlepv:
+ name = [name]
+ request = [request]
+
+ elif request is None:
+ request = [None] * len(name)
+
+ assert len(name) == len(request), (name, request)
+
+ # use Queue instead of Event to allow KeyboardInterrupt
+ done = Queue()
+ result = [TimeoutError()] * len(name)
+ ops = [None] * len(name)
+
+ raw_get = super(Context, self).get
+
+ try:
+ for i, (N, req) in enumerate(izip(name, request)):
+ def cb(value, i=i):
+ try:
+ if not isinstance(value, Cancelled):
+ done.put_nowait((value, i))
+ _log.debug('get %s Q %r', N, value)
+ except:
+ _log.exception("Error queuing get result %s", value)
+
+ _log.debug('get %s w/ %s', N, req)
+ ops[i] = raw_get(N, cb, request=req)
+
+ for _n in range(len(name)):
+ try:
+ value, i = done.get(timeout=timeout)
+ except Empty:
+ if throw:
+ _log.debug('timeout %s after %s', name[i], timeout)
+ raise TimeoutError()
+ break
+ _log.debug('got %s %r', name[i], value)
+ if throw and isinstance(value, Exception):
+ raise value
+ result[i] = value
+
+ finally:
+ [op and op.close() for op in ops]
+
+ if singlepv:
+ return result[0]
+ else:
+ return result
+
+[docs] def put(self, name, values, request=None, timeout=5.0, throw=True,
+ process=None, wait=None, get=True):
+ """Write a new value of some number of PVs.
+
+ :param name: A single name string or list of name strings
+ :param values: A single value, a list of values, a dict, a `Value`. May be modified by the constructor nt= argument.
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+ :param float timeout: Operation timeout in seconds
+ :param bool throw: When true, operation error throws an exception.
+ If False then the Exception is returned instead of the Value
+ :param str process: Control remote processing. May be 'true', 'false', 'passive', or None.
+ :param bool wait: Wait for all server processing to complete.
+ :param bool get: Whether to do a Get before the Put. If True then the value passed to the builder callable
+ will be initialized with recent PV values. eg. use this with NTEnum to find the enumeration list.
+
+ :returns: A None or Exception, or list of same
+
+ When invoked with a single name then returns is a single value.
+ When invoked with a list of name, then returns a list of values
+
+ If 'wait' or 'process' is specified, then 'request' must be omitted or None.
+
+ >>> ctxt = Context('pva')
+ >>> ctxt.put('pv:name', 5.0)
+ >>> ctxt.put(['pv:1', 'pv:2'], [1.0, 2.0])
+ >>> ctxt.put('pv:name', {'value':5})
+ >>>
+
+ The provided value(s) will be automatically coerced to the target type.
+ If this is not possible then an Exception is raised/returned.
+
+ Unless the provided value is a dict, it is assumed to be a plain value
+ and an attempt is made to store it in '.value' field.
+ """
+ singlepv = isinstance(name, (bytes, unicode))
+ if request and (process or wait is not None):
+ raise ValueError("request= is mutually exclusive to process= or wait=")
+ elif process or wait is not None:
+ request = 'field()record[block=%s,process=%s]' % ('true' if wait else 'false', process or 'passive')
+ if not singlepv:
+ request = [request]*len(name)
+
+ if singlepv:
+ name = [name]
+ values = [values]
+ request = [request]
+
+ elif request is None:
+ request = [None] * len(name)
+
+ assert len(name) == len(request), (name, request)
+ assert len(name) == len(values), (name, values)
+
+ # use Queue instead of Event to allow KeyboardInterrupt
+ done = Queue()
+ result = [TimeoutError()] * len(name)
+ ops = [None] * len(name)
+
+ raw_put = super(Context, self).put
+
+ try:
+ for i, (n, value, req) in enumerate(izip(name, values, request)):
+ if isinstance(value, (bytes, unicode)) and value[:1] == '{':
+ try:
+ value = json.loads(value)
+ except ValueError:
+ raise ValueError("Unable to interpret '%s' as json" % value)
+
+ # completion callback
+ def cb(value, i=i):
+ try:
+ done.put_nowait((value, i))
+ except:
+ _log.exception("Error queuing put result %r", value)
+
+ ops[i] = raw_put(n, cb, builder=value, request=req, get=get)
+
+ for _n in range(len(name)):
+ try:
+ value, i = done.get(timeout=timeout)
+ except Empty:
+ if throw:
+ raise TimeoutError()
+ break
+ if throw and isinstance(value, Exception):
+ raise value
+ result[i] = value
+
+ if singlepv:
+ return result[0]
+ else:
+ return result
+ finally:
+ [op and op.close() for op in ops]
+
+[docs] def rpc(self, name, value, request=None, timeout=5.0, throw=True):
+ """Perform a Remote Procedure Call (RPC) operation
+
+ :param str name: PV name string
+ :param Value value: Arguments. Must be Value instance
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+ :param float timeout: Operation timeout in seconds
+ :param bool throw: When true, operation error throws an exception.
+ If False then the Exception is returned instead of the Value
+
+ :returns: A Value or Exception. Subject to :py:ref:`unwrap`.
+
+ >>> ctxt = Context('pva')
+ >>> ctxt.rpc('pv:name:add', {'A':5, 'B'; 6})
+ >>>
+
+ The provided value(s) will be automatically coerced to the target type.
+ If this is not possible then an Exception is raised/returned.
+
+ Unless the provided value is a dict, it is assumed to be a plain value
+ and an attempt is made to store it in '.value' field.
+ """
+ done = Queue()
+
+ op = super(Context, self).rpc(name, done.put_nowait, value, request=request)
+
+ try:
+ try:
+ result = done.get(timeout=timeout)
+ except Empty:
+ result = TimeoutError()
+ if throw and isinstance(result, Exception):
+ raise result
+
+ return result
+ except:
+ op.close()
+ raise
+
+[docs] def monitor(self, name, cb, request=None, notify_disconnect=False, queue=None):
+ """Create a subscription.
+
+ :param str name: PV name string
+ :param callable cb: Processing callback
+ :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default.
+ :param bool notify_disconnect: In additional to Values, the callback may also be call with instances of Exception.
+ Specifically: Disconnected , RemoteError, or Cancelled
+ :param WorkQueue queue: A work queue through which monitor callbacks are dispatched.
+ :returns: a :py:class:`Subscription` instance
+
+ The callable will be invoked with one argument which is either.
+
+ * A p4p.Value (Subject to :py:ref:`unwrap`)
+ * A sub-class of Exception (Disconnected , RemoteError, or Cancelled)
+ """
+ R = Subscription(self, name, cb, notify_disconnect=notify_disconnect, queue=queue)
+
+ R._S = super(Context, self).monitor(name, R._event, request)
+ return R
+
+
+import logging
+_log = logging.getLogger(__name__)
+
+try:
+ from itertools import izip
+except ImportError:
+ izip = zip
+
+from collections import OrderedDict
+from ..wrapper import Type, Value
+from .common import timeStamp, alarm, NTBase
+from .scalar import NTScalar
+from .ndarray import NTNDArray
+from .enum import NTEnum
+
+__all__ = [
+ 'NTScalar',
+ 'NTEnum',
+ 'NTMultiChannel',
+ 'NTTable',
+ 'NTNDArray',
+ 'defaultNT',
+]
+
+_default_nt = {
+ "epics:nt/NTScalar:1.0": NTScalar,
+ "epics:nt/NTScalarArray:1.0": NTScalar,
+ "epics:nt/NTEnum:1.0": NTEnum,
+ "epics:nt/NTNDArray:1.0": NTNDArray,
+}
+
+[docs]def defaultNT():
+ """Returns a copy of the default NT helper mappings.
+
+ :since: 3.1.0
+ """
+ return _default_nt.copy()
+
+class UnwrapOnly(object):
+ def __init__(self, unwrap):
+ self.unwrap = unwrap
+ def __call__(self):
+ return self # we are state-less
+ def wrap(self, V):
+ return V
+
+def buildNT(nt=None, unwrap=None):
+ if unwrap is False or nt is False:
+ return ClientUnwrapper({}) # disable use of wrappers
+
+ if unwrap is not None:
+ # legacy
+ ret = {} # ignore new style
+ for ID,fn in (unwrap or {}).items():
+ ret[ID] = UnwrapOnly(fn)
+
+ else:
+ ret = dict(_default_nt)
+ ret.update(nt or {})
+
+ return ClientUnwrapper(ret)
+
+class ClientUnwrapper(object):
+ def __init__(self, nt=None):
+ self.nt = nt
+ self.id = None
+ self._wrap = self._unwrap = lambda x:x
+ self._assign = self._default_assign
+ def wrap(self, val, **kws):
+ """Pack a arbitrary python object into a Value
+ """
+ return self._wrap(val, **kws)
+ def unwrap(self, val):
+ """Unpack a Value as some other python type
+ """
+ if val.getID()!=self.id:
+ self._update(val)
+ return self._unwrap(val)
+
+ def assign(self, V, value):
+ if V.getID()!=self.id:
+ self._update(V)
+ self._assign(V, value)
+
+ def _update(self, val):
+ # type change
+ nt = self.nt.get(val.getID())
+ if nt is not None:
+ nt = nt() # instancate
+ self._wrap, self._unwrap = nt.wrap, nt.unwrap
+ self._assign = nt.assign
+ self.id = val.getID()
+ else:
+ # reset
+ self._wrap = self._unwrap = lambda x:x
+ self._assign = self._default_assign
+
+ def _default_assign(self, V, value):
+ V.value = value # assume NTScalar-like
+
+ def __repr__(self):
+ return '%s(%s)'%(self.__class__.__name__, repr(self.nt))
+ __str__ = __repr__
+
+[docs]class NTMultiChannel(NTBase):
+
+ """Describes a structure holding the equivalent of a number of NTScalar
+ """
+ Value = Value
+
+[docs] @staticmethod
+ def buildType(valtype, extra=[]):
+ """Build a Type
+
+ :param str valtype: A type code to be used with the 'value' field. Must be an array
+ :param list extra: A list of tuples describing additional non-standard fields
+ :returns: A :py:class:`Type`
+ """
+ assert valtype[:1] == 'a', 'valtype must be an array'
+ return Type(id="epics:nt/NTMultiChannel:1.0",
+ spec=[
+ ('value', valtype),
+ ('channelName', 'as'),
+ ('descriptor', 's'),
+ ('alarm', alarm),
+ ('timeStamp', timeStamp),
+ ('severity', 'ai'),
+ ('status', 'ai'),
+ ('message', 'as'),
+ ('secondsPastEpoch', 'al'),
+ ('nanoseconds', 'ai'),
+ ('userTag', 'ai'),
+ ('isConnected', 'a?'),
+ ] + extra)
+
+
+[docs]class NTTable(NTBase):
+
+ """A generic table
+
+ >>> table = NTTable.buildType(columns=[
+ ('columnA', 'ai'),
+ ('columnB', 'as'),
+ ])
+ """
+ Value = Value
+
+[docs] @staticmethod
+ def buildType(columns=[], extra=[]):
+ """Build a table
+
+ :param list columns: List of column names and types. eg [('colA', 'd')]
+ :param list extra: A list of tuples describing additional non-standard fields
+ :returns: A :py:class:`Type`
+ """
+ return Type(id="epics:nt/NTTable:1.0",
+ spec=[
+ ('labels', 'as'),
+ ('value', ('S', None, columns)),
+ ('descriptor', 's'),
+ ('alarm', alarm),
+ ('timeStamp', timeStamp),
+ ] + extra)
+
+ def __init__(self, columns=[], extra=[]):
+ self.labels = []
+ C = []
+ for col, type in columns:
+ if type[0] == 'a':
+ raise ValueError("NTTable column types may not be array")
+ C.append((col, 'a' + type))
+ self.labels.append(col)
+ self.type = self.buildType(C, extra=extra)
+
+[docs] def wrap(self, values, **kws):
+ """Pack an iterable of dict into a Value
+
+ >>> T=NTTable([('A', 'ai'), ('B', 'as')])
+ >>> V = T.wrap([
+ {'A':42, 'B':'one'},
+ {'A':43, 'B':'two'},
+ ])
+ """
+ if isinstance(values, Value):
+ return values
+ else:
+ cols = dict([(L, []) for L in self.labels])
+ try:
+ # unzip list of dict
+ for V in values:
+ for L in self.labels:
+ try:
+ cols[L].append(V[L])
+ except (IndexError, KeyError):
+ pass
+ # allow omit empty columns
+ for L in self.labels:
+ V = cols[L]
+ if len(V) == 0:
+ del cols[L]
+
+ try:
+ values = self.Value(self.type, {
+ 'labels': self.labels,
+ 'value': cols,
+ })
+ except:
+ _log.error("Failed to encode '%s' with %s", cols, self.labels)
+ raise
+ except:
+ _log.exception("Failed to wrap: %s", values)
+ raise
+ return self._annotate(values, **kws)
+
+[docs] @staticmethod
+ def unwrap(value):
+ """Iterate an NTTable
+
+ :returns: An iterator yielding an OrderedDict for each column
+ """
+ ret = []
+
+ # build lists of column names, and value
+ lbl, cols = [], []
+ for cname, cval in value.value.items():
+ lbl.append(cname)
+ cols.append(cval)
+
+ # zip together column arrays to iterate over rows
+ for rval in izip(*cols):
+ # zip together column names and row values
+ ret.append(OrderedDict(zip(lbl, rval)))
+
+ return ret
+
+[docs]class NTURI(object):
+
+[docs] @staticmethod
+ def buildType(args):
+ """Build NTURI
+
+ :param list args: A list of tuples of query argument name and PVD type code.
+
+ >>> I = NTURI([
+ ('arg_a', 'I'),
+ ('arg_two', 's'),
+ ])
+ """
+ try:
+ return Type(id="epics:nt/NTURI:1.0", spec=[
+ ('scheme', 's'),
+ ('authority', 's'),
+ ('path', 's'),
+ ('query', ('S', None, args)),
+ ])
+ except Exception as e:
+ raise ValueError('Unable to build NTURI compatible type from %s' % args)
+
+ def __init__(self, args):
+ self._args = list(args)
+ self.type = self.buildType(args)
+
+[docs] def wrap(self, path, args=(), kws={}, scheme='', authority=''):
+ """Wrap argument values (tuple/list with optional dict) into Value
+
+ :param str path: The PV name to which this call is made
+ :param tuple args: Ordered arguments
+ :param dict kws: Keyword arguments
+ :rtype: Value
+ """
+ # build dict of argument name+value
+ AV = {}
+ AV.update([A for A in kws.items() if A[1] is not None])
+ AV.update([(N, V) for (N, _T), V in zip(self._args, args)])
+
+ # list of argument name+type tuples for which a value was provided
+ AT = [A for A in self._args if A[0] in AV]
+
+ T = self.buildType(AT)
+ try:
+ return Value(T, {
+ 'scheme': scheme,
+ 'authority': authority,
+ 'path': path,
+ 'query': AV,
+ })
+ except Exception as e:
+ raise ValueError('Unable to initialize NTURI %s from %s using %s' % (AT, AV, self._args))
+
+"""Helper for handling NTNDArray a la. areaDetector.
+
+Known attributes
+
+"ColorMode" (inner-most left, as given in NDArray.cpp, numpy.ndarray.shape is reversed from this)
+ 0 - Mono [Nx, Ny]
+ 1 - Bayer [Nx, Ny]
+ 2 - RGB1 [3, Nx, Ny]
+ 3 - RGB2 [Nx, 3, Ny]
+ 4 - RGB3 [Nx, Ny, 3]
+ 5 - YUV444 ?
+ 6 - YUV422 ??
+ 7 - YUV411 ???
+"""
+
+import logging
+_log = logging.getLogger(__name__)
+
+import time
+import numpy
+
+from ..wrapper import Type, Value
+from .common import alarm, timeStamp, NTBase
+
+from .scalar import ntwrappercommon
+
+
+[docs]class ntndarray(ntwrappercommon, numpy.ndarray):
+ """
+ Augmented numpy.ndarray with additional attributes
+
+ * .attrib - dictionary
+ * .severity
+ * .status
+ * .timestamp - Seconds since 1 Jan 1970 UTC as a float
+ * .raw_stamp - A tuple (seconds, nanoseconds)
+ * .raw - The underlying :py:class:`p4p.Value`.
+
+ Keys in the ``attrib`` dictionary may be any python which may be stored in a PVA field,
+ including an arbitrary ``Value``.
+ However, special handling is attempted if the provided ``Value`` appears to be an NTScalar
+ or similar, in which case the .value, .alarm and .timeStamp are unpacked to the NTAttribute
+ and other fields are discarded.
+ """
+
+ def __init__(self, *args, **kws):
+ super(ntndarray, self).__init__(*args, **kws)
+ self.attrib = {}
+
+ def _store(self, value):
+ ntwrappercommon._store(self, value)
+ self.attrib = {}
+ for elem in value.get('attribute', []):
+ self.attrib[elem.name] = elem.value
+
+ # we will fail if dimension[] contains None s, or if
+ # the advertised shape isn't consistent with the pixel array length.
+ shape = [D.size for D in value.dimension]
+ shape.reverse()
+
+ # in-place reshape! Isn't numpy fun
+ self.shape = shape or [0] # can't reshape if 0-d, so treat as 1-d if no dimensions provided
+
+ return self
+
+
+[docs]class NTNDArray(NTBase):
+ """Representation of an N-dimensional array with meta-data
+
+ Translates into `ntndarray`
+ """
+ Value = Value
+ ntndarray = ntndarray
+
+ # map numpy.dtype.char to .value union member name
+ _code2u = {
+ '?':'booleanValue',
+ 'b':'byteValue',
+ 'h':'shortValue',
+ 'i':'intValue',
+ 'l':'longValue',
+ 'B':'ubyteValue',
+ 'H':'ushortValue',
+ 'I':'uintValue',
+ 'L':'ulongValue',
+ 'f':'floatValue',
+ 'd':'doubleValue',
+ }
+
+[docs] @classmethod
+ def buildType(klass, extra=[]):
+ """Build type
+ """
+ ret = klass._default_type
+ if extra:
+ L = ret.aspy()
+ L.extend(extra)
+ ret = Type(L, ret.getID())
+ return ret
+
+ _default_type = Type([
+ ('value', ('U', None, [
+ ('booleanValue', 'a?'),
+ ('byteValue', 'ab'),
+ ('shortValue', 'ah'),
+ ('intValue', 'ai'),
+ ('longValue', 'al'),
+ ('ubyteValue', 'aB'),
+ ('ushortValue', 'aH'),
+ ('uintValue', 'aI'),
+ ('ulongValue', 'aL'),
+ ('floatValue', 'af'),
+ ('doubleValue', 'ad'),
+ ])),
+ ('codec', ('S', 'codec_t', [
+ ('name', 's'),
+ ('parameters', 'v'),
+ ])),
+ ('compressedSize', 'l'),
+ ('uncompressedSize', 'l'),
+ ('uniqueId', 'i'),
+ ('dataTimeStamp', timeStamp),
+ ('alarm', alarm),
+ ('timeStamp', timeStamp),
+ ('dimension', ('aS', 'dimension_t', [
+ ('size', 'i'),
+ ('offset', 'i'),
+ ('fullSize', 'i'),
+ ('binning', 'i'),
+ ('reverse', '?'),
+ ])),
+ ('attribute', ('aS', 'epics:nt/NTAttribute:1.0', [
+ ('name', 's'),
+ ('value', 'v'),
+ ('tags', 'as'),
+ ('descriptor', 's'),
+ ('alarm', alarm),
+ ('timeStamp', timeStamp),
+ ('sourceType', 'i'),
+ ('source', 's'),
+ ])),
+ ], id='epics:nt/NTNDArray:1.0')
+
+ def __init__(self, **kws):
+ self.type = self.buildType(**kws)
+
+[docs] def wrap(self, value, **kws):
+ """Wrap numpy.ndarray as Value
+ """
+ attrib = getattr(value, 'attrib', None) or kws.pop('attrib', None) or {}
+
+ value = numpy.asarray(value) # loses any special/augmented attributes
+ dims = value.shape
+
+ if 'ColorMode' not in attrib:
+ # attempt to infer color mode from shape
+ if value.ndim==2:
+ attrib['ColorMode'] = 0 # gray
+
+ elif value.ndim==3:
+ for idx,dim in enumerate(reversed(dims)): # inner-most sent as left
+ if dim==3: # assume it's a color
+ attrib['ColorMode'] = 2 + idx # 2 - RGB1, 3 - RGB2, 4 - RGB3
+ break # assume that the first is color, and any subsequent dim=3 is a thin ROI
+ else:
+ raise ValueError("Unable to deduce color dimension from shape %r"%dims)
+
+ dataSize = value.nbytes
+
+ return self._annotate(Value(self.type, {
+ 'value': (self._code2u[value.dtype.char], value.flatten()),
+ 'compressedSize': dataSize,
+ 'uncompressedSize': dataSize,
+ 'uniqueId': 0,
+ 'attribute': [translateNDAttribute(K,V) for K, V in attrib.items()],
+ 'dimension': [{'size': N,
+ 'offset': 0,
+ 'fullSize': N,
+ 'binning': 1,
+ 'reverse': False} for N in reversed(dims)],
+ }), **kws)
+
+[docs] @classmethod
+ def unwrap(klass, value):
+ """Unwrap Value as NTNDArray
+ """
+ V = value.value
+ if V is None:
+ # Union empty. treat as zero-length char array
+ V = numpy.zeros((0,), dtype=numpy.uint8)
+ return V.view(klass.ntndarray)._store(value)
+
+
+
+def translateNDAttribute(name, value):
+ if isinstance(value, Value) and 'value' in value: # assume to be NT-like
+ V = {
+ 'name': name,
+ 'value': value['value'],
+ }
+ if 'alarm' in value:
+ V['alarm'] = value['alarm']
+ if 'timeStamp' in value:
+ V['timeStamp'] = value['timeStamp']
+ return V
+
+ return {'name': name, 'value': value}
+
+
+import time
+import sys
+import numpy
+
+from ..wrapper import Type, Value
+from .common import alarm, timeStamp, NTBase
+
+if sys.version_info >= (3, 0):
+ unicode = str
+
+
+class ntwrappercommon(object):
+ raw = timestamp = None
+
+ def _store(self, value):
+ assert isinstance(value, Value), value
+ self.raw = value
+ self.severity = value.get('alarm.severity', 0)
+ self.status = value.get('alarm.status', 0)
+ S, NS = value.get('timeStamp.secondsPastEpoch', 0), value.get('timeStamp.nanoseconds', 0)
+ self.raw_stamp = S, NS
+ self.timestamp = S + NS * 1e-9
+ # TODO: unpack display/control
+ return self
+
+ def __str__(self):
+ return '%s %s' % (time.ctime(self.timestamp), self.__repr__())
+
+ tostr = __str__
+
+[docs]class ntfloat(ntwrappercommon, float):
+ """
+ Augmented float with additional attributes
+
+ * .severity
+ * .status
+ * .timestamp - Seconds since 1 Jan 1970 UTC as a float
+ * .raw_stamp - A tuple (seconds, nanoseconds)
+ * .raw - The underlying :py:class:`p4p.Value`.
+ """
+
+
+[docs]class ntint(ntwrappercommon, int):
+ """
+ Augmented integer with additional attributes
+
+ * .severity
+ * .status
+ * .timestamp - Seconds since 1 Jan 1970 UTC as a float
+ * .raw_stamp - A tuple (seconds, nanoseconds)
+ * .raw - The underlying :py:class:`p4p.Value`.
+ """
+
+
+class ntbool(ntwrappercommon, int):
+ """
+ Augmented boolean with additional attributes
+
+ * .severity
+ * .status
+ * .timestamp - Seconds since 1 Jan 1970 UTC as a float
+ * .raw_stamp - A tuple (seconds, nanoseconds)
+ * .raw - The underlying :py:class:`p4p.Value`.
+ """
+ def __new__(cls, value):
+ return int.__new__(cls, bool(value))
+
+ def __repr__(self):
+ return bool(self).__repr__().lower()
+
+
+[docs]class ntstr(ntwrappercommon, unicode):
+ """
+ Augmented string with additional attributes
+
+ * .severity
+ * .status
+ * .timestamp - Seconds since 1 Jan 1970 UTC as a float
+ * .raw_stamp - A tuple (seconds, nanoseconds)
+ * .raw - The underlying :py:class:`p4p.Value`.
+ """
+
+
+[docs]class ntnumericarray(ntwrappercommon, numpy.ndarray):
+ """
+ Augmented numpy.ndarray with additional attributes
+
+ * .severity
+ * .status
+ * .timestamp - Seconds since 1 Jan 1970 UTC as a float
+ * .raw_stamp - A tuple (seconds, nanoseconds)
+ * .raw - The underlying :py:class:`p4p.Value`.
+ """
+
+ @classmethod
+ def build(klass, val):
+ assert len(val.shape) == 1, val.shape
+ # clone
+ return klass(shape=val.shape, dtype=val.dtype, buffer=val.data,
+ strides=val.strides)
+
+
+[docs]class ntstringarray(ntwrappercommon, list):
+ """
+ Augmented list of strings with additional attributes
+
+ * .severity
+ * .status
+ * .timestamp - Seconds since 1 Jan 1970 UTC as a float
+ * .raw_stamp - A tuple (seconds, nanoseconds)
+ * .raw - The underlying :py:class:`p4p.Value`.
+ """
+
+def _metaHelper(F, valtype, display=False, control=False, valueAlarm=False, form=False):
+ isnumeric = valtype[-1:] not in '?su'
+ if display and isnumeric:
+ F.extend([
+ ('display', ('S', None, [
+ ('limitLow', valtype[-1:]),
+ ('limitHigh', valtype[-1:]),
+ ('description', 's'),
+ ('precision', 'i'),
+ ('form', ('S', 'enum_t', [
+ ('index', 'i'),
+ ('choices', 'as'),
+ ])),
+ ('units', 's'),
+ ] if form else [
+ ('limitLow', valtype[-1:]),
+ ('limitHigh', valtype[-1:]),
+ ('description', 's'),
+ ('format', 's'),
+ ('units', 's'),
+ ])),
+ ])
+ elif display and not isnumeric:
+ F.extend([
+ ('display', ('S', None, [
+ ('description', 's'),
+ ('units', 's'),
+ ])),
+ ])
+ if control and isnumeric:
+ F.extend([
+ ('control', ('S', None, [
+ ('limitLow', valtype[-1:]),
+ ('limitHigh', valtype[-1:]),
+ ('minStep', valtype[-1:]),
+ ])),
+ ])
+ if valueAlarm and isnumeric:
+ F.extend([
+ ('valueAlarm', ('S', None, [
+ ('active', '?'),
+ ('lowAlarmLimit', valtype[-1:]),
+ ('lowWarningLimit', valtype[-1:]),
+ ('highWarningLimit', valtype[-1:]),
+ ('highAlarmLimit', valtype[-1:]),
+ ('lowAlarmSeverity', 'i'),
+ ('lowWarningSeverity', 'i'),
+ ('highWarningSeverity', 'i'),
+ ('highAlarmSeverity', 'i'),
+ ('hysteresis', 'd'),
+ ])),
+ ])
+
+[docs]class NTScalar(NTBase):
+
+ """Describes a single scalar or array of scalar values and associated meta-data
+
+ >>> stype = NTScalar('d') # scalar double
+ >>> V = stype.wrap(4.2)
+ >>> assert isinstance(V, Value)
+
+ >>> stype = NTScalar.buildType('ad') # vector double
+ >>> V = Value(stype, {'value': [4.2, 4.3]})
+
+ The result of `wrap()` is an augmented value object combining
+ `ntwrappercommon` and a python value type (`str`, `int`, `float`, `numpy.ndarray`).
+
+ Agumented values have some additional attributes including:
+
+ * .timestamp - The update timestamp is a float representing seconds since 1 jan 1970 UTC.
+ * .raw_stamp - A tuple of (seconds, nanoseconds)
+ * .severity - An integer in the range [0, 3]
+ * .raw - The complete underlying :class:`~p4p.Value`
+
+ :param str valtype: A type code to be used with the 'value' field. See :ref:`valuecodes`
+ :param list extra: A list of tuples describing additional non-standard fields
+ :param bool display: Include optional fields for display meta-data
+ :param bool control: Include optional fields for control meta-data
+ :param bool valueAlarm: Include optional fields for alarm level meta-data
+ :param bool form: Include ``display.form`` instead of the deprecated ``display.format``.
+ """
+ Value = Value
+
+[docs] @staticmethod
+ def buildType(valtype, extra=[], *args, **kws):
+ """Build a Type
+
+ :param str valtype: A type code to be used with the 'value' field. See :ref:`valuecodes`
+ :param list extra: A list of tuples describing additional non-standard fields
+ :param bool display: Include optional fields for display meta-data
+ :param bool control: Include optional fields for control meta-data
+ :param bool valueAlarm: Include optional fields for alarm level meta-data
+ :param bool form: Include ``display.form`` instead of the deprecated ``display.format``.
+ :returns: A :py:class:`Type`
+ """
+ isarray = valtype[:1] == 'a'
+ F = [
+ ('value', valtype),
+ ('alarm', alarm),
+ ('timeStamp', timeStamp),
+ ]
+ _metaHelper(F, valtype, *args, **kws)
+ F.extend(extra)
+ return Type(id="epics:nt/NTScalarArray:1.0" if isarray else "epics:nt/NTScalar:1.0",
+ spec=F)
+
+ def __init__(self, valtype='d', **kws):
+ self.type = self.buildType(valtype, **kws)
+
+[docs] def wrap(self, value, **kws):
+ """Pack python value into Value
+
+ Accepts dict to explicitly initialize fields by name.
+ Any other type is assigned to the 'value' field.
+ """
+ if isinstance(value, Value):
+ pass
+ elif isinstance(value, ntwrappercommon):
+ kws.setdefault('timestamp', value.timestamp)
+ value = value.raw
+ elif isinstance(value, dict):
+ value = self.Value(self.type, value)
+ else:
+ value = self.Value(self.type, {'value': value})
+
+ return self._annotate(value, **kws)
+
+ typeMap = {
+ bool: ntbool,
+ int: ntint,
+ float: ntfloat,
+ unicode: ntstr,
+ numpy.ndarray: ntnumericarray.build,
+ list: ntstringarray,
+ }
+
+[docs] @classmethod
+ def unwrap(klass, value):
+ """Unpack a Value into an augmented python type (selected from the 'value' field)
+ """
+ assert isinstance(value, Value), value
+ V = value.value
+ try:
+ T = klass.typeMap[type(V)]
+ except KeyError:
+ raise ValueError("Can't unwrap value of type %s" % type(V))
+ try:
+ return T(value.value)._store(value)
+ except Exception as e:
+ raise ValueError("Can't construct %s around %s (%s): %s" % (T, value, type(value), e))
+
+
+
+if sys.version_info < (3, 0):
+ class ntlong(ntwrappercommon, long):
+ pass
+
+ NTScalar.typeMap[long] = ntlong
+
+
+import sys
+import logging
+import inspect
+from functools import wraps, partial
+_log = logging.getLogger(__name__)
+
+from .wrapper import Value, Type
+from .nt import NTURI
+from .client.raw import RemoteError
+from .server import DynamicProvider
+from .server.raw import SharedPV
+from .util import ThreadedWorkQueue, WorkQueue, Full, Empty
+
+__all__ = [
+ 'rpc',
+ 'rpccall',
+ 'rpcproxy',
+ 'RemoteError',
+ 'WorkQueue',
+ 'NTURIDispatcher',
+]
+
+
+[docs]def rpc(rtype=None):
+ """Decorator marks a method for export.
+
+ :param type: Specifies which :py:class:`Type` this method will return.
+
+ The return type (rtype) must be one of:
+
+ - An instance of :py:class:`p4p.Type`
+ - None, in which case the method must return a :py:class:`p4p.Value`
+ - One of the NT helper classes (eg :py:class:`p4p.nt.NTScalar`).
+ - A list or tuple used to construct a :py:class:`p4p.Type`.
+
+ Exported methods raise an :py:class:`Exception` to indicate an error to the remote caller.
+ :py:class:`RemoteError` may be raised to send a specific message describing the error condition.
+
+ >>> class Example(object):
+ @rpc(NTScalar.buildType('d'))
+ def add(self, lhs, rhs):
+ return {'value':float(lhs)+flost(rhs)}
+ """
+ wrap = None
+ if rtype is None or isinstance(rtype, Type):
+ pass
+ elif isinstance(type, (list, tuple)):
+ rtype = Type(rtype)
+ elif hasattr(rtype, 'type'): # eg. one of the NT* helper classes
+ wrap = rtype.wrap
+ rtype = rtype.type
+ else:
+ raise TypeError("Not supported")
+
+ def wrapper(fn):
+ if wrap is not None:
+ orig = fn
+
+ @wraps(orig)
+ def wrapper2(*args, **kws):
+ return wrap(orig(*args, **kws))
+ fn = wrapper2
+
+ fn._reply_Type = rtype
+ return fn
+ return wrapper
+
+
+[docs]def rpccall(pvname, request=None, rtype=None):
+ """Decorator marks a client proxy method.
+
+ :param str pvname: The PV name, which will be formated using the 'format' argument of the proxy class constructor.
+ :param request: A pvRequest string or :py:class:`p4p.Value` passed to eg. :py:meth:`p4p.client.thread.Context.rpc`.
+
+ The method to be decorated must have all keyword arguments,
+ where the keywords are type code strings or :class:`~p4p.Type`.
+
+ """
+ def wrapper(fn):
+ fn._call_PV = pvname
+ fn._call_Request = request
+ fn._reply_Type = rtype
+ return fn
+ return wrapper
+
+
+class RPCDispatcherBase(DynamicProvider):
+ def __init__(self, queue, target=None, channels=set(), name=None):
+ DynamicProvider.__init__(self, name, self) # we are our own Handler
+ self.queue = queue
+ self.target = target
+ self.channels = set(channels)
+ self.name = name
+ self.__pv = SharedPV(
+ handler=self, # no per-channel state, and only RPC used, so only need on PV
+ initial=Value(Type([])), # we don't support get/put/monitor, so use empty struct
+ )
+ M = self.methods = {}
+ for name, mem in inspect.getmembers(target):
+ if not hasattr(mem, '_reply_Type'):
+ continue
+ M[name] = mem
+
+ def getMethodNameArgs(self, request):
+ raise NotImplementedError("Sub-class must implement getMethodName")
+ # sub-class needs to extract method name from request
+ # return 'name', {'var':'val'}
+
+ def testChannel(self, name):
+ _log.debug("Test RPC channel %s = %s", name, name in self.channels)
+ return name in self.channels
+
+ def makeChannel(self, name, src):
+ if self.testChannel(name):
+ _log.debug("Open RPC channel %s", name)
+ return self.__pv # no per-channel tracking needed
+ else:
+ _log.warn("Ignore RPC channel %s", name)
+
+ def rpc(self, pv, op):
+ _log.debug("RPC call %s", op)
+ try:
+ self.queue.push(partial(self._handle, op))
+ except Full:
+ _log.warn("RPC call queue overflow")
+ op.done(error="Too many concurrent RPC calls")
+
+ def _handle(self, op):
+ try:
+ request = op.value()
+ name, args = self.getMethodNameArgs(request)
+ fn = self.methods[name]
+ rtype = fn._reply_Type
+
+ R = fn(**args)
+
+ if not isinstance(R, Value):
+ try:
+ R = Value(rtype, R)
+ except:
+ _log.exception("Error encoding %s as %s", R, rtype)
+ op.done(error="Error encoding reply")
+ return
+ _log.debug("RPC reply %s -> %r", request, R)
+ op.done(R)
+
+ except RemoteError as e:
+ _log.debug("RPC reply %s -> error: %s", request, e)
+ op.done(error=str(e))
+
+ except:
+ _log.exception("Error handling RPC %s", request)
+ op.done(error="Error handling RPC")
+
+
+[docs]class NTURIDispatcher(RPCDispatcherBase):
+
+ """RPC dispatcher using NTURI (a al. eget)
+
+ Method names are prefixed with a fixed string.
+
+ >>> queue = WorkQueue()
+ >>> class Summer(object):
+ @rpc([('result', 'i')])
+ def add(self, a=None, b=None):
+ return {'result': int(a)+int(b)}
+ >>> installProvider("arbitrary", NTURIDispatcher(queue, target=Summer(), prefix="pv:prefix:"))
+
+ Making a call with the CLI 'eget' utility::
+
+ $ eget -s pv:prefix:add -a a=1 -a b=2
+ ....
+ int result 3
+
+ :param queue WorkQueue: A WorkQueue to which RPC calls will be added
+ :param prefix str: PV name prefix used by RPC methods
+ :param target: The object which has the RPC calls
+ """
+
+ def __init__(self, queue, prefix=None, **kws):
+ RPCDispatcherBase.__init__(self, queue, **kws)
+ self.prefix = prefix
+ self.methods = dict([(prefix + meth, fn) for meth, fn in self.methods.items()])
+ self.channels = set(self.methods.keys())
+ _log.debug('NTURI methods: %s', ', '.join(self.channels))
+
+ def getMethodNameArgs(self, request):
+ # {'schema':'pva', 'path':'pvname', 'query':{'var':'val', ...}}
+ return request.path, dict(request.query.items())
+
+# legecy for MASAR only
+# do not use in new code
+
+
+class MASARDispatcher(RPCDispatcherBase):
+
+ def __init__(self, queue, **kws):
+ RPCDispatcherBase.__init__(self, queue, **kws)
+ _log.debug("MASAR pv %s methods %s", self.channels, self.methods)
+
+ def getMethodNameArgs(self, request):
+ # all through a single PV, method name in request
+ # {'function':'rpcname', 'name':['name', ...], 'value':['val', ...]}
+ return request.function, dict(zip(request.get('name', []), request.get('value', [])))
+
+
+[docs]def quickRPCServer(provider, prefix, target,
+ maxsize=20,
+ workers=1,
+ useenv=True, conf=None, isolate=False):
+ """Run an RPC server in the current thread
+
+ Calls are handled sequentially, and always in the current thread, if workers=1 (the default).
+ If workers>1 then calls are handled concurrently by a pool of worker threads.
+ Requires NTURI style argument encoding.
+
+ :param str provider: A provider name. Must be unique in this process.
+ :param str prefix: PV name prefix. Along with method names, must be globally unique.
+ :param target: The object which is exporting methods. (use the :func:`rpc` decorator)
+ :param int maxsize: Number of pending RPC calls to be queued.
+ :param int workers: Number of worker threads (default 1)
+ :param useenv: Passed to :class:`~p4p.server.Server`
+ :param conf: Passed to :class:`~p4p.server.Server`
+ :param isolate: Passed to :class:`~p4p.server.Server`
+ """
+ from p4p.server import Server
+ import time
+ queue = ThreadedWorkQueue(maxsize=maxsize, workers=workers)
+ provider = NTURIDispatcher(queue, target=target, prefix=prefix, name=provider)
+ threads = []
+ server = Server(providers=[provider], useenv=useenv, conf=conf, isolate=isolate)
+ with server, queue:
+ while True:
+ time.sleep(10.0)
+
+
+[docs]class RPCProxyBase(object):
+
+ """Base class for automatically generated proxy classes
+ """
+ context = None
+ "The Context provided on construction"
+ format = None
+ "The tuple/dict used to format ('%' operator) PV name strings."
+ timeout = 3.0
+ "Timeout of RPC calls in seconds"
+ authority = ''
+ "Authority string sent with NTURI requests"
+ throw = True
+ "Whether call errors raise an exception, or return it"
+ scheme = None # set to override automatic
+
+
+def _wrapMethod(K, V):
+ pv, req = V._call_PV, V._call_Request
+ if sys.version_info >= (3, 0):
+ S = inspect.getfullargspec(V)
+ keywords = S.varkw
+ else:
+ S = inspect.getargspec(V)
+ keywords = S.keywords
+
+ if S.varargs is not None or keywords is not None:
+ raise TypeError("vararg not supported for proxy method %s" % K)
+
+ if len(S.args) != len(S.defaults):
+ raise TypeError("proxy method %s must specify types for all arguments" % K)
+
+ try:
+ NT = NTURI(zip(S.args, S.defaults))
+ except Exception as e:
+ raise TypeError("%s : failed to build method from %s, %s" % (e, S.args, S.defaults))
+
+ @wraps(V)
+ def mcall(self, *args, **kws):
+ pvname = pv % self.format
+ try:
+ uri = NT.wrap(pvname, args, kws, scheme=self.scheme or self.context.name, authority=self.authority)
+ except Exception as e:
+ raise ValueError("Unable to wrap %s %s as %s (%s)" % (args, kws, NT, e))
+ return self.context.rpc(pvname, uri, request=req, timeout=self.timeout, throw=self.throw)
+
+ return mcall
+
+
+[docs]def rpcproxy(spec):
+ """Decorator to enable this class to proxy RPC client calls
+
+ The decorated class constructor takes two additional arguments,
+ `context=` is required to be a :class:`~p4p.client.thread.Context`.
+ `format`= can be a string, tuple, or dictionary and is applied
+ to PV name strings given to :py:func:`rpcall`.
+ Other arguments are passed to the user class constructor. ::
+
+ @rpcproxy
+ class MyProxy(object):
+ @rpccall("%s:add")
+ def add(lhs='d', rhs='d'):
+ pass
+
+ ctxt = Context('pva')
+ proxy = MyProxy(context=ctxt, format="tst:") # evaluates "%s:add"%"tst:"
+
+ The decorated class will be a sub-class of the provided class and :class:`RPCProxyBase`.
+ """
+ # inject our ctor first so we don't have to worry about super() non-sense.
+
+ def _proxyinit(self, context=None, format={}, **kws):
+ assert context is not None, context
+ self.context = context
+ self.format = format
+ spec.__init__(self, **kws)
+ obj = {'__init__': _proxyinit}
+
+ for K, V in inspect.getmembers(spec, lambda M: hasattr(M, '_call_PV')):
+ obj[K] = _wrapMethod(K, V)
+
+ return type(spec.__name__, (RPCProxyBase, spec), obj)
+
+import sys
+import logging
+import warnings
+import re
+import time
+import uuid
+from weakref import ref
+
+from weakref import WeakSet
+
+from .. import _p4p
+from .._p4p import (Server as _Server,
+ StaticProvider as _StaticProvider,
+ DynamicProvider as _DynamicProvider,
+ ServerOperation,
+ )
+
+if sys.version_info >= (3, 0):
+ unicode = str
+
+_log = logging.getLogger(__name__)
+
+__all__ = (
+ 'Server',
+ 'installProvider',
+ 'removeProvider',
+ 'StaticProvider',
+ 'DynamicProvider',
+ 'ServerOperation',
+)
+
+
+
+def clearProviders():
+ _p4p._providers.clear()
+
+[docs]class Server(object):
+
+ """Server(conf=None, useenv=True, providers=[""])
+
+ :param providers: A list of provider names or instances. See below.
+ :param dict conf: Configuration keys for the server. Uses same names as environment variables (aka. EPICS_PVAS_*)
+ :param bool useenv: Whether to use process environment in addition to provided config.
+ :param bool isolate: If True, override conf= and useenv= to select a configuration suitable for isolated testing.
+ eg. listening only on localhost with a randomly chosen port number. Use `conf()` to determine
+ which port is being used.
+
+ Run a PVAccess server serving Channels from the listed providers.
+ The server is running after construction, until stop(). ::
+
+ S = Server(providers=["example"])
+ # do something else
+ S.stop()
+
+ As a convenience, a Server may be used as a context manager to automatically `stop()`. ::
+
+ with Server(providers=["example"]) as S:
+ # do something else
+
+ When configuring a Server, conf keys provided to the constructor have the same name as the environment variables.
+ If both are given, then the provided conf dict is used.
+
+ Call Server.conf() to see a list of valid server (EPICS_PVAS_*) key names and the actual values.
+
+ The providers list may contain: name strings (cf. installProvider()),
+ `StaticProvider` or `DynamicProvider` instances, or a dict "{'pv:name':`SharedPV`}" to implicitly creat a `StaticProvider`.
+ Each entry may also be a tuple "(provider, order)" where "provider" is any of the allowed types,
+ and "order" is an integer used to resolve ambiguity if more than one provider may claim a PV name.
+ (lower numbers are queried first, the default order is 0)
+ """
+
+ def __init__(self, providers, isolate=False, **kws):
+ self.__keep_alive = [] # ick...
+
+ if isinstance(providers, (bytes, unicode)):
+ providers = providers.split() # split on space
+ warnings.warn("Server providers list should be a list", DeprecationWarning)
+
+ Ps = []
+ for provider in providers:
+ if isinstance(provider, tuple):
+ provider, order = provider
+ elif hasattr(provider, 'order'):
+ order = provider.order
+ else:
+ order = 0
+
+ if isinstance(provider, (bytes, unicode)):
+ if not re.match(r'^[^ \t\n\r]+$', provider):
+ raise ValueError("Invalid provider name: '%s'"%provider)
+ Ps.append((provider, order))
+
+ elif isinstance(provider, (_StaticProvider, _DynamicProvider, _p4p.Source)):
+ Ps.append((provider, order))
+
+ elif hasattr(provider, 'items'):
+ P = StaticProvider()
+ for name, pv in provider.items():
+ P.add(name, pv)
+ Ps.append((P, order))
+ # Normally user code is responsible for keeping the StaticProvider alive.
+ # Not possible in this case though.
+ self.__keep_alive.append(P)
+
+ else:
+ raise ValueError("providers=[] must be a list of string, SharedPV, or dict. Not %s"%provider)
+
+ if isolate:
+ assert 'useenv' not in kws and 'conf' not in kws, kws
+ kws['useenv'] = False
+ kws['conf'] = {
+ 'EPICS_PVAS_INTF_ADDR_LIST': '127.0.0.1',
+ 'EPICS_PVA_ADDR_LIST': '127.0.0.1',
+ 'EPICS_PVA_AUTO_ADDR_LIST': '0',
+ 'EPICS_PVA_SERVER_PORT': '0',
+ 'EPICS_PVA_BROADCAST_PORT': '0',
+ }
+
+
+ _log.debug("Starting Server isolated=%s, %s", isolate, kws)
+ self._S = _Server(providers=Ps, **kws)
+
+ self.tostr = self._S.tostr
+
+ self._S.start()
+ try:
+ if _log.isEnabledFor(logging.DEBUG):
+ _log.debug("New Server: %s", self.tostr(5))
+ except:
+ self._S.stop()
+ raise
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, A, B, C):
+ self.stop()
+
+[docs] def conf(self):
+ """Return a dict() with the effective configuration this server is using.
+
+ Suitable to pass to another Server to duplicate this configuration,
+ or to a client Context to allow it to connect to this server. ::
+
+ with Server(providers=["..."], isolate=True) as S:
+ with p4p.client.thread.Context('pva', conf=S.conf(), useenv=False) as C:
+ print(C.get("pv:name"))
+ """
+ return self._S.conf()
+
+[docs] def stop(self):
+ """Force server to stop serving, and close connections to existing clients.
+ """
+ _log.debug("Stopping Server")
+ self._S.stop()
+ self.__keep_alive = []
+
+ @classmethod
+ def forever(klass, *args, **kws):
+ """Create a server and block the calling thread until KeyboardInterrupt.
+ Shorthand for: ::
+
+ with Server(*args, **kws):
+ try;
+ time.sleep(99999999)
+ except KeyboardInterrupt:
+ pass
+ """
+ with klass(*args, **kws):
+ _log.info("Running server")
+ try:
+ while True:
+ time.sleep(100)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ _log.info("Stopping server")
+
+[docs]class StaticProvider(_StaticProvider):
+
+ """A channel provider which servers from a clearly defined list of names.
+ This list may change at any time.
+
+ :param str name: Provider name. Must be unique within the local context in which it is used.
+ None, the default, will choose an appropriate value.
+ """
+ def __init__(self, name=None):
+ if name is None:
+ # Caller doesn't care. Pick something unique w/o spaces
+ name = str(uuid.uuid4())
+ super(StaticProvider, self).__init__(name)
+
+[docs]class DynamicProvider(_DynamicProvider):
+
+ """A channel provider which does not maintain a list of provided channel names.
+
+ The following example shows a simple case, in fact so simple that StaticProvider
+ is a better fit. ::
+
+ class DynHandler(object):
+ def __init__(self):
+ self.pv = SharedPV()
+ def testChannel(self, name): # return True, False, or DynamicProvider.NotYet
+ return name=="blah"
+ def makeChannel(self, name, peer):
+ if name=="blah":
+ return self.pv
+ # return None falls through to next source
+ provider = DynamicProvider("arbitrary", DynHandler())
+ server = Server(providers=[provider])
+ """
+
+ # Return from Handler.testChannel() to prevent caching of negative result.
+ # Use when testChannel('name') might shortly return True
+ NotYet = b'nocache'
+
+ def __init__(self, name, handler):
+ _DynamicProvider.__init__(self, name, self._WrapHandler(handler))
+
+ class _WrapHandler(object):
+
+ "Wrapper around user Handler which logs exception"
+
+ def __init__(self, real):
+ self._real = real
+
+ def testChannel(self, name):
+ try:
+ return self._real.testChannel(name)
+ except:
+ _log.exception("Unexpected")
+
+ def makeChannel(self, name, peer):
+ try:
+ return self._real.makeChannel(name, peer)
+ except:
+ _log.exception("Unexpected")
+
+def _cleanup_servers():
+ _log.debug("Stopping all Server instances")
+ servers = list(_p4p.all_servers)
+ for srv in servers:
+ srv.stop()
+
+
+import logging
+
+from functools import partial
+
+import asyncio
+
+from .raw import SharedPV as _SharedPV, Handler
+from ..client.thread import RemoteError
+from ..client.asyncio import get_running_loop, create_task, all_tasks
+
+__all__ = (
+ 'SharedPV',
+ 'Handler',
+)
+
+_log = logging.getLogger(__name__)
+
+def _log_err(V):
+ if isinstance(V, Exception):
+ _log.error("Unhandled from SharedPV handler: %s", V)
+ # TODO: figure out how to show stack trace...
+ # until then, propagate in the hope that someone else will
+ return V
+
+
+def _handle(pv, op, M, args): # callback in asyncio loop
+ try:
+ _log.debug('SERVER HANDLE %s %s %r', op, M, args)
+ maybeco = M(*args)
+ if asyncio.iscoroutine(maybeco):
+ _log.debug('SERVER SCHEDULE %s', maybeco)
+ task = create_task(maybeco)
+
+ # we have no good place to join async put()/rpc() handler results
+ # other than SharedPV.close(sync=True) which is both optional,
+ # and potentially far in the future. So we log and otherwise
+ # discard the result.
+ task.add_done_callback(_log_err)
+ task._SharedPV = pv # mark so _wait_closed() can distinguish
+ return # caller is responsible for op.done()
+ except RemoteError as e:
+ err = e
+ except Exception as e:
+ _log.exception("Unexpected")
+ err = e
+ if op is not None:
+ op.done(error=str(err))
+
+
+
+
+
+import logging
+_log = logging.getLogger(__name__)
+
+from .._p4p import SharedPV as _SharedPV
+
+__all__ = (
+ 'SharedPV',
+ 'Handler',
+)
+
+
+class ServOpWrap(object):
+
+ def __init__(self, op, wrap, unwrap):
+ self._op, self._wrap, self._unwrap = op, wrap, unwrap
+
+ def value(self):
+ V = self._op.value()
+ try:
+ return self._unwrap(V)
+ except: # py3 will chain automatically, py2 won't
+ raise ValueError("Unable to unwrap %r with %r"%(V, self._unwrap))
+
+ def done(self, value=None, error=None):
+ if value is not None:
+ try:
+ value = self._wrap(value)
+ except:
+ raise ValueError("Unable to wrap %r with %r"%(value, self._wrap))
+ self._op.done(value, error)
+
+ def __getattr__(self, key):
+ return getattr(self._op, key) # dispatch to _p4p.ServerOperation
+
+
+[docs]class Handler(object):
+ """Skeleton of SharedPV Handler
+
+ Use of this as a base class is optional.
+ """
+
+[docs] def put(self, pv, op):
+ """
+ Called each time a client issues a Put
+ operation on this Channel.
+
+ :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with.
+ :param ServerOperation op: The operation being initiated.
+ """
+ op.done(error='Not supported')
+
+[docs] def rpc(self, pv, op):
+ """
+ Called each time a client issues a Remote Procedure Call
+ operation on this Channel.
+
+ :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with.
+ :param ServerOperation op: The operation being initiated.
+ """
+ op.done(error='Not supported')
+
+[docs] def onFirstConnect(self, pv):
+ """
+ Called when the first Client channel is created.
+
+ :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with.
+ """
+ pass
+
+[docs] def onLastDisconnect(self, pv):
+ """
+ Called when the last Client channel is closed.
+
+ :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with.
+ """
+ pass
+
+
+
+
+
+import logging
+_log = logging.getLogger(__name__)
+
+from functools import partial
+from threading import Event
+
+from ..util import _defaultWorkQueue
+from .raw import SharedPV as _SharedPV, Handler
+from ..client.raw import RemoteError
+
+__all__ = (
+ 'SharedPV',
+ 'Handler',
+)
+
+def _on_queue(op, M, *args):
+ try:
+ M(*args)
+ return
+ except RemoteError as e:
+ err = e
+ except Exception as e:
+ _log.exception("Unexpected")
+ err = e
+ if op is not None:
+ op.done(error=str(err))
+
+
+
+
+
+import logging
+_log = logging.getLogger(__name__)
+
+from functools import partial
+
+try:
+ from Queue import Queue, Full, Empty
+except ImportError:
+ from queue import Queue, Full, Empty
+from threading import Thread, Event
+
+__all__ = [
+ 'WorkQueue',
+]
+
+
+[docs]class WorkQueue(object):
+
+ """A threaded work queue.
+ """
+ _stopit = object()
+
+ def __init__(self, maxsize=5):
+ self._Q = Queue(maxsize=maxsize)
+
+ def push(self, callable):
+ self._Q.put_nowait(callable) # throws Queue.Full
+
+ def push_wait(self, callable):
+ self._Q.put(callable)
+
+[docs] def interrupt(self):
+ """Break one call to handle()
+
+ eg. Call N times to break N threads.
+
+ This call blocks if the queue is full.
+ """
+ self._Q.put(self._stopit)
+
+[docs] def handle(self):
+ """Process queued work until interrupt() is called
+ """
+ while True:
+ # TODO: Queue.get() (and anything using thread.allocate_lock
+ # ignores signals :( so timeout periodically to allow delivery
+ try:
+ callable = None # ensure no lingering references to past work while blocking
+ callable = self._Q.get(True, 1.0)
+ except Empty:
+ continue # retry on timeout
+ try:
+ if callable is self._stopit:
+ break
+ callable()
+ except:
+ _log.exception("Error from WorkQueue w/ %r", callable)
+ finally:
+ self._Q.task_done()
+
+class ThreadedWorkQueue(WorkQueue):
+ def __init__(self, name=None, workers=1, daemon=False, **kws):
+ assert workers>=1, workers
+ WorkQueue.__init__(self, **kws)
+ self.name = name
+ self._daemon = daemon
+ self._T = [None]*workers
+
+ def __enter__(self):
+ self.start()
+ def __exit__(self, A,B,C):
+ self.stop()
+
+ def start(self):
+ for n in range(len(self._T)):
+ if self._T[n] is not None:
+ continue
+ T = self._T[n] = Thread(name='%s[%d]'%(self.name, n), target=self.handle)
+ T.daemon = self._daemon
+ T.start()
+
+ return self # allow chaining
+
+ def stop(self):
+ [self.interrupt() for T in self._T if T is not None]
+ [T.join() for T in self._T if T is not None]
+ self._T = [None]*len(self._T)
+
+ return self # allow chaining
+
+ def sync(self, timeout=None):
+ wait1 = [Event() for _n in range(len(self._T))]
+ wait2 = [Event() for _n in range(len(self._T))]
+
+ def syncX(wait1, wait2):
+ wait1.set()
+ wait2.wait()
+
+ [self.push_wait(partial(syncX, wait1[n], wait2[n])) for n in range(len(self._T))]
+
+ # wait for all workers to ready wait1 barrier
+ for W in wait1:
+ W.wait(timeout=timeout)
+
+ # allow workers to proceeed
+ for W in wait2:
+ W.set()
+
+ return self # allow chaining
+
+# lazy create a default work queues
+
+class _DefaultWorkQueue(object):
+
+ def __init__(self, workers=4): # TODO: configurable?
+ self.W = [None]*workers
+ self.n = 0
+
+ def __del__(self):
+ self.stop()
+
+ def __call__(self):
+ W = self.W[self.n]
+ if W is None:
+ # daemon=True otherwise the MainThread exit handler tries to join too early
+ W = self.W[self.n] = ThreadedWorkQueue(maxsize=0, daemon=True).start()
+
+ # sort of load balancing by giving different queues to each SharedPV
+ # but preserve ordering or callbacks as each SharedPV has only one queue
+ self.n = (self.n+1)%len(self.W)
+ return W
+
+ def sync(self):
+ [W.sync() for W in self.W if W is not None]
+
+ def stop(self):
+ [W.stop() for W in self.W if W is not None]
+ self.W = [None]*len(self.W)
+
+_defaultWorkQueue = _DefaultWorkQueue()
+
+# This module defines sub-classes of C extension classes
+# which add functionality that is better expressed in python.
+# These types are then pushed (by _magic) down into extension
+# code where they will be used as the types passed to callbacks.
+from . import _p4p
+
+__all__ = (
+ 'Type',
+ 'Value',
+ 'Struct',
+ 'StructArray',
+ 'Union',
+ 'UnionArray',
+)
+
+
+def Struct(spec=None, id=None):
+ return ('S', id, spec)
+
+
+def Union(spec=None, id=None):
+ return ('U', id, spec)
+
+
+def StructArray(spec=None, id=None):
+ return ('aS', id, spec)
+
+
+def UnionArray(spec=None, id=None):
+ return ('aU', id, spec)
+
+
+[docs]class Type(_p4p._Type):
+ """Type(fields, id=None, base=None)
+
+ :param list fields: A list of tuples describing members of this data structure.
+ :param str id: Type label string.
+ :param Type base: Copy the fields of `Type`, then amend with the provided fields.
+
+ A definition of a data structure consisting of a list of field names and types,
+ as well as an optional type name string (id="").
+ Field type specifications are either a string eg. "d" (double precision float),
+ or a tuple ("S", None, [fields...) defining a sub-structure. ::
+
+ T = Type([
+ ('value', 'I'),
+ ])
+
+ Defines a structure with a single field named 'value' with type u32 (unsigned integer width 32-bit).
+
+ An example of defining a sub-structure. ::
+
+ T = Type([
+ ('value', ('S', None, [
+ ('index', 'i'),
+ ])),
+ ])
+
+ Type specifier codes:
+
+ ==== =======
+ Code Type
+ ==== =======
+ ? bool
+ s unicode
+ b s8
+ B u8
+ h s16
+ H u16
+ i i32
+ I u32
+ l i64
+ L u64
+ f f32
+ d f64
+ v variant
+ U union
+ S struct
+ ==== =======
+ """
+ __slots__ = [] # we don't allow custom attributes for now
+ __contains__ = _p4p._Type.has
+
+ def __call__(self, initial=None):
+ return Value(self, initial)
+
+
+
+
+
+
+
+ def __repr__(self):
+ S, id, fields = self.aspy()
+ return 'Type(%s, id="%s")' % (fields, id)
+ __str__ = __repr__
+
+_p4p.Type = Type
+
+
+[docs]class Value(_p4p._Value):
+ """Value(type[, initial])
+
+ :param Type type: The `Type` describing the structure to be instanciated
+ :param dict initial: A dictionary (or any mapping) which gives initial values for fields.
+
+ Representation of a data structure of a particular :py:class:`Type`.
+
+ Can be created using a :py:class:`Type`, with an optional dict containing initial values. ::
+
+ A = Value(Type([
+ ('value', 'I'),
+ ]), {
+ 'value': 42,
+ })
+
+ Defines a structure with a single field named 'value' with type u32 (unsigned integer width 32-bit).
+
+ An example of defining a sub-structure. ::
+
+ A = Value(Type([
+ ('value', ('S', None, [
+ ('index', 'i'),
+ ])),
+ ]), {
+ 'value': {'index', 5},
+ # 'value.index': 5, # equivalent
+ })
+
+ Defines a structure containing a sub-structure 'value' which has a single field 'index' which is
+ a signed 32-bit integer.
+ """
+ __slots__ = [] # prevent attribute access to invalid fields
+
+ __contains__ = _p4p._Value.has
+
+ def keys(self):
+ """keys() -> Iterable[str]
+ """
+ return self.type().keys()
+
+ def __iter__(self):
+ return iter(self.type())
+
+[docs] def changed(self, *fields):
+ """changed(*fields) -> bool
+ Test if one or more named fields have changed.
+
+ A field is considered to have changed if it is marked as changed,
+ or if its parent, or any child, field is marked as changed.
+ """
+ S = super(Value, self).changed
+ for fld in fields or (None,): # no args tests for any change
+ if S(fld):
+ return True
+ return False
+
+[docs] def changedSet(self, expand=False, parents=False):
+ """changedSet(expand=False, parents=False) -> set
+ :param bool expand: Whether to expand when entire sub-structures are marked as changed.
+ If True, then sub-structures are expanded and only leaf fields will be included.
+ If False, then a direct translation is made, which may include both leaf and sub-structure fields.
+ :param bool parents: If True, include fake entries for parent sub-structures with leaf fields marked as changed.
+ :returns: A :py:class:`set` of names of those fields marked as changed.
+
+ Return a :py:class:`set` containing the names of all changed fields. ::
+
+ A = Value(Type([
+ ('x', 'i'),
+ ('z', ('S', None, [
+ ('a', 'i'),
+ ('b', 'i'),
+ ])),
+ ]), {
+ })
+
+ A.mark('z')
+ assert A.changedSet(expand=False) == {'z'} # only shows fields explicitly marked
+ assert A.changedSet(expand=True) == {'z.a', 'z.b'} # actually used during network transmission
+ A.mark('z.a') # redundant
+ assert A.changedSet(expand=False) == {'z', 'z.a'}
+ assert A.changedSet(expand=True) == {'z.a', 'z.b'}
+ A.unmark('z')
+ assert A.changedSet(expand=False) == {'z.a'}
+ assert A.changedSet(expand=True) == {'z.a'}
+ assert A.changedSet(expand=False, parents=True) == {'z', 'z.a'}
+ assert A.changedSet(expand=True, parents=True) == {'z', 'z.a'}
+
+
+ * expand=False, parents=False gives a direct mapping of the underlying BitSet as it would (get/monitor),
+ or have been (put/rpc), moved over the network.
+ * expand=True, parents=False gives the effective set of leaf fields which will be moved over the network.
+ taking into account the use of whole sub-structure compress/shorthand bits.
+ * expand=False, parents=True gives a way of testing if anything changed within a set of interesting fields
+ (cf. set.intersect).
+ """
+ return _p4p._Value.changedSet(self, expand, parents)
+
+ # TODO: deprecate
+ asSet = changedSet
+
+ def clear(self):
+ self.mark(None, False)
+
+ __str__ = _p4p._Value.tostr
+
+ def __repr__(self):
+ parts = []
+
+ ID = self.getID()
+ if ID!='structure':
+ parts.append('id:'+ID)
+
+ try:
+ parts.append(repr(self.value))
+ except AttributeError: # no .value
+ try:
+ parts.append(repr(self.get(self.type().keys()[0])))
+ except IndexError: # empty Structure
+ pass
+
+ return 'Value(%s)'%', '.join(parts)
+
+_p4p.Value = Value
+