Skip to content

Commit

Permalink
Merge pull request #185 from CJ-Wright/new_api
Browse files Browse the repository at this point in the history
New to event model API
  • Loading branch information
CJ-Wright authored May 3, 2019
2 parents 6d42fc3 + 202b420 commit f03cd02
Show file tree
Hide file tree
Showing 5 changed files with 501 additions and 118 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ omit =

exclude_lines =
if __name__ == '__main__':
# pragma: no coverage
14 changes: 14 additions & 0 deletions news/new_api
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
**Added:**

* ``shed.simple.simple_to_event_stream_new_api`` which has a new API for
describing the data which is going to be converted to the event model

**Changed:** None

**Deprecated:** None

**Removed:** None

**Fixed:** None

**Security:** None
294 changes: 291 additions & 3 deletions shed/simple.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Translation nodes"""
import time
import uuid
from collections import deque
from collections import deque, Mapping

import networkx as nx
import numpy as np
from event_model import compose_descriptor
from event_model import compose_descriptor, compose_run
from rapidz.core import Stream, zip as szip, move_to_first
from shed.doc_gen import CreateDocs
from shed.doc_gen import CreateDocs, get_dtype
from xonsh.lib.collections import ChainDB, _convert_to_dict

ALL = "--ALL THE DOCS--"
Expand Down Expand Up @@ -265,6 +266,293 @@ def update(self, x, who=None):
return rl


_GLOBAL_SCAN_ID = 0


@Stream.register_api()
class simple_to_event_stream_new_api(Stream):
"""Converts data into a event stream, and passes it downstream.
Parameters
----------
descriptor_dicts: dict
Dictionary describing the mapping between streams and their associated
metadata. Top level keys are the streams to use accept data from.
The values for these keys are the entries of the ``descriptor``
document (``data_keys``, ``name``, ``configuration``, etc.
see _https://nsls-ii.github.io/bluesky/event_descriptors.html for more
details. Note that some of this data is automatically generated if
stream_name : str, optional
Name for this stream node
Notes
-----
The result emitted from this stream follows the document model.
This is essentially a state machine. Transitions are:
start -> stop
start -> descriptor -> event -> stop
Note that start -> start is not allowed, this node always issues a stop
document so the data input times can be stored.
Additionally note that this takes advantage of Python 3.6+ order stable
dictionaries. Since the key order is stable the data keys can be given
in the same order as the elements of streams which contain multiple
elements, see stream a in the example.
Examples
--------
from pprint import pprint
from rapidz import Stream
from shed import simple_from_event_stream, simple_to_event_stream_new_api
import operator as op
source = Stream()
stop = simple_from_event_stream(source, 'stop', ())
fes = simple_from_event_stream(source, 'event', ('data', 'motor1'),
principle=True)
fes2 = simple_from_event_stream(source, 'event', ('data', ),
principle=True)
a = fes.map(op.add, 2).zip(fes)
b = fes.combine_latest(stop, emit_on=stop).pluck(0)
node = simple_to_event_stream_new_api(
{
a: {
'data_keys': {'motor1_2': {}, 'motor1': {}},
},
b: {
'name': 'final',
'data_keys': {'motor1': {}},
'configuration': {'motor1': {'hi': 'world'}}
},
fes2: {'name': 'raw'}
}
)
node2 = simple_to_event_stream_new_api(
{fes2: {'name': 'primary'}}
)
from ophyd.sim import hw
hw = hw()
from bluesky.run_engine import RunEngine
RE = RunEngine()
import bluesky.plans as bp
# node2.sink(pprint)
node.sink(pprint)
RE.subscribe(lambda *x: source.emit(x))
RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))
prints:
('start',...)
('descriptor',...)
('event',...)
...
('stop',...)
"""

def __init__(self, descriptor_dicts, stream_name=None, **kwargs):
self.descriptor_dicts = descriptor_dicts
self.descriptors = {}
if stream_name is None:
stream_name = ""
for v in descriptor_dicts.values():
stream_name += f"{v.get('name', 'primary')} " + ", ".join(
v.get("data_keys", {}).keys()
)

Stream.__init__(
self,
upstreams=[k for k in descriptor_dicts.keys()],
stream_name=stream_name,
)
self.md = kwargs

move_to_first(self)

self.incoming_start_uid = None
self.incoming_stop_uid = None

self.state = "stopped"
self.subs = []

self.uid = str(uuid.uuid4())

# walk upstream to get all upstream nodes to the translation node
# get start_uids from the translation node
self.graph = nx.DiGraph()
walk_to_translation(self, graph=self.graph)

self.translation_nodes = {
k: n["stream"]
for k, n in self.graph.node.items()
if isinstance(
n["stream"],
(
simple_from_event_stream,
simple_to_event_stream,
simple_to_event_stream_new_api,
),
)
and n["stream"] != self
}
self.principle_nodes = [
n
for k, n in self.translation_nodes.items()
if getattr(n, "principle", False)
or isinstance(n, SimpleToEventStream)
]
if not self.principle_nodes:
raise RuntimeError(
f"No Principle Nodes Detected for node "
f"{stream_name}, "
f"{[k.data_address for k in self.translation_nodes.values()]}"
)
for p in self.principle_nodes:
p.subs.append(self)

def emit_start(self, x):
# if we have seen this start document already do nothing, we have
# multiple parents so we may get a start doc multiple times
name, doc = x
if doc["uid"] is self.incoming_start_uid:
return
else:
self.incoming_start_uid = doc["uid"]
# Prime stop document
self.incoming_stop_uid = None
# Emergency stop if we get a new start document and no stop has been
# issued
if self.state != "stopped":
self.emit_stop(x)
# This is a bit of jank to make certain we don't override the
# user metadata with pipeline metadata
old_md = dict(self.md)

self.md.update(
dict(
parent_uids=list(
set(
[
v.start_uid
for k, v in self.translation_nodes.items()
if v.start_uid is not None
]
)
),
parent_node_map={
v.uid: v.start_uid
for k, v in self.translation_nodes.items()
if v.start_uid is not None
},
# keep track of this so we know which node we're sending
# data from (see merkle hash in DBFriendly)
outbound_node=self.uid,
)
)
global _GLOBAL_SCAN_ID
_GLOBAL_SCAN_ID += 1
self.md.update(scan_id=_GLOBAL_SCAN_ID)
bundle = compose_run(metadata=self.md, validate=False)
start, self.desc_fac, self.resc_fac, self.stop_factory = bundle
self.start_uid = start["uid"]
self.md = old_md

# emit starts to subs first in case we create an event from the start
[s.emit_start(x) for s in self.subs]
self.emit(("start", start))
self.state = "started"

def emit_stop(self, x):
name, doc = x
if doc["uid"] is self.incoming_stop_uid:
return
else:
self.incoming_stop_uid = doc["uid"]
# Prime for next run
self.incoming_start_uid = None
stop = self.stop_factory()
self.descriptors.clear()
ret = self.emit(("stop", stop))
[s.emit_stop(x) for s in self.subs]
self.state = "stopped"
return ret

def update(self, x, who=None):
rl = []
# If we have a start document ready to go, release it.
if self.state == "stopped": # pragma: no coverage
raise RuntimeError(
"Can't emit events from a stopped state "
"it seems that a start was not emitted"
)

descriptor_dict = self.descriptor_dicts[who]
data_keys = descriptor_dict.setdefault("data_keys", {})

# If there are no data_keys then we are taking in a dict and the
# keys of the dict will be the keys for the stream
if data_keys == {}:
if not isinstance(x, Mapping): # pragma: no coverage
raise TypeError(
f"No data keys were provided so expected "
f"Mapping, but {type(x)} found"
)
data_keys = {k: {} for k in x}

# If the incoming data is a dict extract the data as a tuple
if isinstance(x, Mapping):
x = tuple([x[k] for k in data_keys.keys()])
# normalize the data to a tuple
if not isinstance(x, tuple):
tx = tuple([x])
# XXX: need to do something where the data is a tuple!
elif len(data_keys) == 1:
tx = tuple([x])
else:
tx = x

# If we haven't issued a descriptor yet make one
if who not in self.descriptors:
# clobber the user supplied metadata and the auto generated
# metadata via ChainDB with resolution favoring the user's input
descriptor, self.descriptors[who], _ = self.desc_fac(
**_convert_to_dict(
ChainDB(
dict(
name="primary",
data_keys={
k: {
"source": "analysis",
"dtype": get_dtype(xx),
"shape": getattr(xx, "shape", []),
**data_keys[k].get(k, {}),
}
for k, xx in zip(data_keys, tx)
},
hints={
"analyzer": {"fields": sorted(list(data_keys))}
},
object_keys={k: [k] for k in data_keys},
),
descriptor_dict,
)
),
validate=False,
)
rl.append(self.emit(("descriptor", descriptor)))
self.state = "described"
event = self.descriptors[who](
timestamps={k: time.time() for k in data_keys},
data={k: v for k, v in zip(data_keys, tx)},
validate=False,
)
rl.append(self.emit(("event", event)))

return rl


@Stream.register_api()
class SimpleToEventStream(simple_to_event_stream):
pass
Expand Down
Loading

0 comments on commit f03cd02

Please sign in to comment.