diff --git a/docs/_templates/using_servers.rst b/docs/_templates/using_servers.rst index c913703..d574c53 100644 --- a/docs/_templates/using_servers.rst +++ b/docs/_templates/using_servers.rst @@ -46,10 +46,11 @@ folders. Users will need to specify which folder(s) they would like the data to be placed in before running their experiments.', qoi='The QOI server performs additional data processing and analysis, -extracting quantities of interest from raw and reduced data. +extracting quantities of interest from raw and reduced data.', viz='The viz server visualizes raw and processed data as it becomes available. This server should already be running, only beamline staff should need to -start it.' +start it.', +tomo='The tomo server runs tomographic reconstructions on all scalar data (raw and analyzed). The reconstruction algorithm can be changed via the ``algorithm`` keyword.' ) %} {% for name, blurb in servers_dict.items() %} diff --git a/examples/ctPDF_example.py b/examples/ctPDF_example.py new file mode 100644 index 0000000..ece5740 --- /dev/null +++ b/examples/ctPDF_example.py @@ -0,0 +1,49 @@ +import os +import time + +import bluesky.plans as bp +import dxchange +import numpy as np +import tomopy +from bluesky.run_engine import RunEngine +from ophyd.sim import SynSignal, hw +from xpdan.vend.callbacks.zmq import Publisher +from xpdconf.conf import glbl_dict + +hw = hw() +rot_center = 290 +m = hw.motor1 +m.kind = "hinted" +mm = hw.motor2 +mm.kind = "hinted" +mmm = hw.motor3 +mmm.kind = "hinted" +xrun(0, + bp.grid_scan( + [xpd_pe1c], + mmm, + 0, + 2, + 2, + m, + 0, + 180, + 4, + True, + mm, + 200, + 401, + 4, + True, + + md={ + "tomo": { + "type": "pencil", + "rotation": "motor1", + "translation": "motor2", + "stack": "motor3", + "center": rot_center - 200, + } + }, + ) +) diff --git a/examples/multi_tomo_example.py b/examples/multi_tomo_example.py new file mode 100644 index 0000000..787c68a --- /dev/null +++ b/examples/multi_tomo_example.py @@ -0,0 +1,151 @@ +import os +import time + +import bluesky.plans as bp +import dxchange +import numpy as np +import tomopy +from bluesky.run_engine import RunEngine +from ophyd.sim import SynSignal, hw, SynSignalWithRegistry +from xpdan.vend.callbacks.zmq import Publisher +from xpdconf.conf import glbl_dict + +hw = hw() +fname = os.path.expanduser("~/Downloads/tooth.h5") + +proj, flat, dark, theta = dxchange.read_aps_32id(fname, sino=(0, 1)) + +proj = tomopy.normalize(proj, flat, dark) + +rot_center = tomopy.find_center(proj, theta, init=290, ind=0, tol=0.5) +# proj2 = np.hstack((proj[:, :, :],) * 200) +# proj2 = np.hstack((proj[:, :, :],) * 1) +# rot_center -= 200 +proj2 = proj +m = hw.motor1 +m.kind = "hinted" +mm = hw.motor2 +mm.kind = "hinted" +mmm = hw.motor3 +mmm.kind = "hinted" + + +class FullField: + def __call__(self, *args, **kwargs): + v = m.get()[0] + out = proj2[int(v), :, :] + print(v) + time.sleep(.5) + return out + + +class Pencil: + def __call__(self, *args, **kwargs): + v = m.get()[0] + vv = mm.get()[0] + out = proj2[int(v), :, int(vv)] + print(v, vv, mmm.get()[0]) + time.sleep(.5) + return np.squeeze(out) + + +f = FullField() +# det = SynSignal(f, name="img", labels={"detectors"}) +det = SynSignalWithRegistry(f, name="img", labels={"detectors"},) +det.kind = "hinted" + +#g = Pencil() +#det2 = SynSignal(g, name="img", labels={"detectors"}) +#det2.kind = "hinted" + +RE = RunEngine() +RE.md['analysis_stage'] = 'raw' +p = Publisher(glbl_dict["inbound_proxy_address"], prefix=b"raw") +t = RE.subscribe(p) +# RE.subscribe(print) + +# Build scan +l = [0, 90] +for i in range(8): + ll = l.copy() + interval = sorted(set(ll))[1] / 2 + for lll in ll: + j = lll + interval + j = round(j, 0) + if j not in l and j < 180: + l.append(j) +# Run Full Field Scans, each scan has more slices, showing how we can minimize +# the number of slices by interleaving them by half +for i in [2 ** n for n in range(2, 8)] + [180]: + RE( + bp.list_scan( + [det], + m, + l[:i], + md={ + "tomo": { + "type": "full_field", + "rotation": "motor1", + "center": rot_center, + } + }, + ) + ) + print(i) + time.sleep(3) +''' +# Run in pencil beam geometry (this takes a long time!) +RE( + bp.grid_scan( + [det2], + m, + 0, + 180, + 181, + mm, + 0, + 639, + 640, + True, + md={ + "tomo": { + "type": "pencil", + "rotation": "motor1", + "translation": "motor2", + "center": rot_center, + } + }, + ) +) +# Run in 3D pencil beam geometry +RE( + bp.grid_scan( + [det2], + mmm, + 0, + 2, + 10, + m, + 0, + 180, + 41, + True, + mm, + 200, + 401, + 21, + True, + + md={ + "tomo": { + "type": "pencil", + "rotation": "motor1", + "translation": "motor2", + "stack": "motor3", + "center": rot_center - 200, + } + }, + ) +) +RE.abort() +''' diff --git a/examples/tomo_example.py b/examples/tomo_example.py index 9d2cbc8..36e42e2 100644 --- a/examples/tomo_example.py +++ b/examples/tomo_example.py @@ -6,7 +6,7 @@ import numpy as np import tomopy from bluesky.run_engine import RunEngine -from ophyd.sim import SynSignal, hw +from ophyd.sim import SynSignal, hw, SynSignalWithRegistry from xpdan.vend.callbacks.zmq import Publisher from xpdconf.conf import glbl_dict @@ -26,6 +26,8 @@ m.kind = "hinted" mm = hw.motor2 mm.kind = "hinted" +mmm = hw.motor3 +mmm.kind = "hinted" class FullField: @@ -42,23 +44,26 @@ def __call__(self, *args, **kwargs): v = m.get()[0] vv = mm.get()[0] out = proj2[int(v), :, int(vv)] - print(v, vv) - time.sleep(.1) + print(v, vv, mmm.get()[0]) + time.sleep(.5) return np.squeeze(out) f = FullField() -det = SynSignal(f, name="img", labels={"detectors"}) +# det = SynSignal(f, name="img", labels={"detectors"}) +det = SynSignalWithRegistry(f, name="img", labels={"detectors"},) det.kind = "hinted" -g = Pencil() -det2 = SynSignal(g, name="img", labels={"detectors"}) -det2.kind = "hinted" +#g = Pencil() +#det2 = SynSignal(g, name="img", labels={"detectors"}) +#det2.kind = "hinted" RE = RunEngine() +RE.md['analysis_stage'] = 'raw' p = Publisher(glbl_dict["inbound_proxy_address"], prefix=b"raw") t = RE.subscribe(p) # RE.subscribe(print) + # Build scan l = [0, 90] for i in range(8): @@ -71,7 +76,7 @@ def __call__(self, *args, **kwargs): l.append(j) # Run Full Field Scans, each scan has more slices, showing how we can minimize # the number of slices by interleaving them by half -for i in [2 ** n for n in range(2, 8)] + [180]: +for i in [180]: RE( bp.list_scan( [det], @@ -88,6 +93,7 @@ def __call__(self, *args, **kwargs): ) print(i) time.sleep(3) +''' # Run in pencil beam geometry (this takes a long time!) RE( bp.grid_scan( @@ -111,4 +117,35 @@ def __call__(self, *args, **kwargs): }, ) ) +# Run in 3D pencil beam geometry +RE( + bp.grid_scan( + [det2], + mmm, + 0, + 2, + 10, + m, + 0, + 180, + 41, + True, + mm, + 200, + 401, + 21, + True, + + md={ + "tomo": { + "type": "pencil", + "rotation": "motor1", + "translation": "motor2", + "stack": "motor3", + "center": rot_center - 200, + } + }, + ) +) RE.abort() +''' diff --git a/news/tomo_3D b/news/tomo_3D new file mode 100644 index 0000000..7392a5f --- /dev/null +++ b/news/tomo_3D @@ -0,0 +1,19 @@ +**Added:** + +* Support for 3D pencil beam reconstructions +* Support for Registry backed full field reconstructions +* ``ctPDF_example.py`` example for pencil beam + +**Changed:** + +* ``tomo_example.py`` runs on a registry backed detector + +**Deprecated:** None + +**Removed:** None + +**Fixed:** + +* Pencil beam no longer tries to run independent var reconstructions + +**Security:** None diff --git a/setup.py b/setup.py index 592c595..0ebc6ae 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,7 @@ "viz_server", "analysis_server", "qoi_server", + "tomo_server", ] entry_points = { diff --git a/xpdan/callbacks.py b/xpdan/callbacks.py index 7133499..24e1527 100644 --- a/xpdan/callbacks.py +++ b/xpdan/callbacks.py @@ -10,7 +10,6 @@ from xpdan.io import pdf_saver, dump_yml from xpdan.vend.callbacks.core import Retrieve from xpdtools.dev_utils import _timestampstr -import mayavi.mlab as mlab class StartStopCallback(CallbackBase): @@ -306,57 +305,3 @@ def event(self, doc): "calib": SaveCalib, "raw": SaveMeta, } - - -class Live3DView(CallbackBase): - """Callback for visualizing 3D data """ - - def __init__(self): - self.cs_dict = {} - self.source_dict = {} - self.fields = [] - self.pipeline_dict = {} - - def start(self, doc): - self.cs_dict = {} - self.source_dict = {} - self.fields = [] - self.pipeline_dict = {} - - def descriptor(self, doc): - - self.fields = [ - k for k, v in doc["data_keys"].items() if len(v["shape"]) == 3 - ] - for field in self.fields: - fig = mlab.figure(field) - mlab.clf(fig) - self.cs_dict[field] = fig - self.source_dict[field] = None - self.pipeline_dict[field] = [] - - def event(self, doc): - - for field in self.fields: - data = doc["data"][field] - figure = self.cs_dict[field] - x = self.source_dict[field] - if x is None: - x = mlab.pipeline.scalar_field(data, figure=figure) - self.source_dict[field] = x - for i, orientation in enumerate("xyz"): - self.pipeline_dict[field].append( - mlab.pipeline.image_plane_widget( - x, - plane_orientation=f"{orientation}_axes", - slice_index=data.shape[i] // 2, - figure=figure, - ) - ) - mlab.pipeline.volume(x, figure=figure) - else: - x.mlab_source.scalars = data - for p in self.pipeline_dict[field]: - sl = p.ipw.slice_index - p.update_pipeline() - p.ipw.slice_index = sl diff --git a/xpdan/mayavi_callbacks.py b/xpdan/mayavi_callbacks.py new file mode 100644 index 0000000..b29919c --- /dev/null +++ b/xpdan/mayavi_callbacks.py @@ -0,0 +1,58 @@ +from bluesky.callbacks import CallbackBase +from mayavi import mlab as mlab + + +class Live3DView(CallbackBase): + """Callback for visualizing 3D data """ + + def __init__(self): + self.cs_dict = {} + self.source_dict = {} + self.fields = [] + self.pipeline_dict = {} + + def start(self, doc): + self.cs_dict = {} + self.source_dict = {} + self.fields = [] + self.pipeline_dict = {} + + def descriptor(self, doc): + + self.fields = [ + k for k, v in doc["data_keys"].items() if len(v["shape"]) == 3 + ] + for field in self.fields: + fig = mlab.figure(field) + mlab.clf(fig) + self.cs_dict[field] = fig + self.source_dict[field] = None + self.pipeline_dict[field] = [] + + def event(self, doc): + + for field in self.fields: + data = doc["data"][field] + figure = self.cs_dict[field] + x = self.source_dict[field] + # Don't plot data which is (N, M, 1) because Mayavi doesn't like it + if data.shape[-1] != 1: + if x is None: + x = mlab.pipeline.scalar_field(data, figure=figure) + self.source_dict[field] = x + for i, orientation in enumerate("xyz"): + self.pipeline_dict[field].append( + mlab.pipeline.image_plane_widget( + x, + plane_orientation=f"{orientation}_axes", + slice_index=data.shape[i] // 2, + figure=figure, + ) + ) + mlab.pipeline.volume(x, figure=figure) + else: + x.mlab_source.scalars = data + for p in self.pipeline_dict[field]: + sl = p.ipw.slice_index + p.update_pipeline() + p.ipw.slice_index = sl \ No newline at end of file diff --git a/xpdan/pipelines/main.py b/xpdan/pipelines/main.py index 5cd87ad..34bf6c1 100644 --- a/xpdan/pipelines/main.py +++ b/xpdan/pipelines/main.py @@ -89,12 +89,15 @@ def start_gen( # TODO: change this when new dark logic comes # Check that the data isn't a dark (dark_frame = True when taking a dark) - not_dark_scan = FromEventStream("start", (), upstream=raw_source).map( + not_dark_scan = FromEventStream("start", (), upstream=raw_source, + stream_name='not dark scan').map( lambda x: not x.get("dark_frame", False) ) # Fill the raw event stream source = ( - raw_source.combine_latest(not_dark_scan) + # Emit on works here because we emit on the not_dark_scan first due + # to the ordering of the nodes! + raw_source.combine_latest(not_dark_scan, emit_on=0) .filter(lambda x: x[1]) .pluck(0) .starmap( @@ -102,7 +105,7 @@ def start_gen( ) .filter(lambda x: x[0] not in ["resource", "datum"]) ) - # source.sink(print) + # source.sink(lambda x: print('Source says ', x)) # Get all the documents start_docs = FromEventStream("start", (), source) descriptor_docs = FromEventStream( diff --git a/xpdan/pipelines/tomo.py b/xpdan/pipelines/tomo.py index 639ef9c..b557ab7 100644 --- a/xpdan/pipelines/tomo.py +++ b/xpdan/pipelines/tomo.py @@ -2,11 +2,13 @@ from rapidz import Stream, move_to_first from shed import SimpleToEventStream, SimpleFromEventStream +from xpdan.callbacks import StartStopCallback from xpdan.vend.callbacks.core import StripDepVar import numpy as np -def pencil_tomo(source: Stream, qoi_name, translation, rotation, **kwargs): +def pencil_tomo(source: Stream, qoi_name, translation, rotation, stack=None, + **kwargs): """Extract data from a raw stream for pencil beam tomography Parameters @@ -22,6 +24,10 @@ def pencil_tomo(source: Stream, qoi_name, translation, rotation, **kwargs): dict : The namespace """ + start = SimpleFromEventStream('start', (), upstream=source) + if stack: + stack_position = SimpleFromEventStream("event", ("data", stack), + upstream=source) x = SimpleFromEventStream("event", ("data", translation), upstream=source) th = SimpleFromEventStream("event", ("data", rotation), upstream=source) @@ -49,6 +55,7 @@ def pencil_tomo(source: Stream, qoi_name, translation, rotation, **kwargs): center = SimpleFromEventStream( "start", ("tomo", "center"), upstream=source ) + source.starsink(StartStopCallback()) return locals() @@ -63,18 +70,27 @@ def full_field_tomo(source: Stream, qoi_name, rotation, **kwargs): center = SimpleFromEventStream( "start", ("tomo", "center"), upstream=source ) + source.starsink(StartStopCallback()) return locals() -def tomo_event_stream(source, rec, sinogram, *, qoi_name, **kwargs): +def tomo_event_stream( + source, rec, sinogram, *, qoi_name, rec_3D=None, **kwargs +): raw_stripped = move_to_first(source.starmap(StripDepVar())) rec_tes = SimpleToEventStream( rec, (f"{qoi_name}_tomo",), analysis_stage="{}_tomo".format(qoi_name) ) + # If we have a 3D reconstruction translate it + if rec_3D: + rec_3D_tes = SimpleToEventStream( + rec_3D, + (f"{qoi_name}_tomo_3D",), + analysis_stage="{}_tomo_3D".format(qoi_name), + ) # Don't run the sinogram for now, since it can produce issues with the viz - sinogram.map(np.shape).sink(print) sinogram_tes = SimpleToEventStream( sinogram, (f"{qoi_name}_sinogram",), diff --git a/xpdan/startup/qoi_server.py b/xpdan/startup/qoi_server.py index 1369b0d..2b8a1bc 100644 --- a/xpdan/startup/qoi_server.py +++ b/xpdan/startup/qoi_server.py @@ -15,6 +15,7 @@ def run_server( prefix=None, outbound_proxy_address=glbl_dict["outbound_proxy_address"], inbound_proxy_address=glbl_dict["inbound_proxy_address"], + _publisher=None, **kwargs ): """Start up the visualization server @@ -40,7 +41,10 @@ def run_server( d = RemoteDispatcher(outbound_proxy_address, prefix=prefix) install_qt_kicker(loop=d.loop) - an_with_ind_pub = Publisher(inbound_proxy_address, prefix=b"qoi") + if _publisher is None: + an_with_ind_pub = Publisher(inbound_proxy_address, prefix=b"qoi") + else: + an_with_ind_pub = _publisher raw_source = Stream() diff --git a/xpdan/startup/tomo_server.py b/xpdan/startup/tomo_server.py index 1b32e4c..c119403 100644 --- a/xpdan/startup/tomo_server.py +++ b/xpdan/startup/tomo_server.py @@ -5,6 +5,7 @@ from bluesky.utils import install_qt_kicker from rapidz import Stream, move_to_first from rapidz.link import link +from shed import SimpleToEventStream from xpdan.pipelines.to_event_model import ( to_event_stream_no_ind, to_event_stream_with_ind, @@ -15,22 +16,31 @@ full_field_tomo, ) from xpdan.vend.callbacks import CallbackBase -from xpdan.vend.callbacks.core import RunRouter +from xpdan.vend.callbacks.core import RunRouter, Retrieve from xpdan.vend.callbacks.zmq import Publisher, RemoteDispatcher from xpdconf.conf import glbl_dict from xpdtools.pipelines.tomo import ( tomo_prep, tomo_pipeline_piecewise, tomo_pipeline_theta, + tomo_stack_2D, ) -pencil_order = [ +pencil_order_2D = [ pencil_tomo, tomo_prep, tomo_pipeline_piecewise, tomo_event_stream, ] +pencil_order_3D = [ + pencil_tomo, + tomo_prep, + tomo_pipeline_piecewise, + tomo_stack_2D, + tomo_event_stream, +] + full_field_order = [full_field_tomo, tomo_pipeline_theta, tomo_event_stream] @@ -51,6 +61,7 @@ def __init__(self, pipeline_factory, publisher, **kwargs): self.dim_names = [] self.translation = None self.rotation = None + self.stack = None self.sources = [] self.kwargs = kwargs @@ -61,20 +72,23 @@ def start(self, doc): for d in doc.get("hints", {}).get("dimensions") if d[0][0] != "time" ] - self.translation = doc["tomo"]["translation"] - self.rotation = doc["tomo"]["rotation"] + tomo_dict = doc["tomo"] + self.translation = tomo_dict["translation"] + self.rotation = tomo_dict["rotation"] + if 'stack' in tomo_dict: + self.stack = tomo_dict['stack'] def descriptor(self, doc): + indep_vars = list( + itertools.chain.from_iterable( + [doc["object_keys"][n] for n in self.dim_names] + )) + # TODO: only listen to primary stream dep_shapes = { n: doc["data_keys"][n]["shape"] for n in doc["data_keys"] - if n - not in list( - itertools.chain.from_iterable( - [doc["object_keys"][n] for n in self.dim_names] - ) - ) + if (n not in indep_vars) and (doc["data_keys"][n]['dtype'] not in ['PDFConfig', 'AzimuthalIntegrator']) } # Only compute QOIs on scalars, currently @@ -89,6 +103,7 @@ def descriptor(self, doc): qoi_name=qoi, translation=self.translation, rotation=self.rotation, + stack=self.stack, x_dimension=self.start_doc["shape"][translation_pos], th_dimension=self.start_doc["shape"][rotation_pos], **self.kwargs, @@ -97,7 +112,8 @@ def descriptor(self, doc): ] for p in pipelines: to_event_stream_no_ind( - p["rec_tes"], p["sinogram_tes"], publisher=self.publisher + *[node for node in p.values() if isinstance(node, SimpleToEventStream)], + publisher=self.publisher ) for s in self.sources: @@ -114,7 +130,7 @@ def stop(self, doc): # Need to destroy pipeline -class FullFieldTomoCallback(CallbackBase): +class FullFieldTomoCallback(Retrieve): """This class caches and passes documents into the pencil tomography pipeline. @@ -123,7 +139,10 @@ class FullFieldTomoCallback(CallbackBase): This class acts as a descriptor router for documents""" - def __init__(self, pipeline_factory, publisher, **kwargs): + def __init__(self, pipeline_factory, publisher, handler_reg, + root_map=None, executor=None, + **kwargs): + super().__init__(handler_reg, root_map, executor) self.pipeline_factory = pipeline_factory self.publisher = publisher @@ -134,6 +153,7 @@ def __init__(self, pipeline_factory, publisher, **kwargs): self.kwargs = kwargs def start(self, doc): + super().start(doc) self.start_doc = doc self.dim_names = [ d[0][0] @@ -175,6 +195,7 @@ def descriptor(self, doc): s.emit(("descriptor", doc)) def event(self, doc): + doc = super().event(doc) for s in self.sources: s.emit(("event", doc)) @@ -184,18 +205,28 @@ def stop(self, doc): # Need to destroy pipeline -def tomo_callback_factory(doc, publisher, **kwargs): +def tomo_callback_factory(doc, publisher, handler_reg=None, **kwargs): # TODO: Eventually extract from plan hints? - if doc.get("tomo", {}).get("type", None) == "pencil": - return PencilTomoCallback( - lambda **inner_kwargs: link(*pencil_order, **inner_kwargs), - publisher, - **kwargs, - ) - elif doc.get("tomo", {}).get("type", None) == "full_field": + tomo_dict = doc.get("tomo", {}) + if tomo_dict.get("type", None) == "pencil": + if "stack" in tomo_dict: + return PencilTomoCallback( + lambda **inner_kwargs: link(*pencil_order_3D, **inner_kwargs), + publisher, + **kwargs, + ) + + else: + return PencilTomoCallback( + lambda **inner_kwargs: link(*pencil_order_2D, **inner_kwargs), + publisher, + **kwargs, + ) + elif tomo_dict.get("type", None) == "full_field": return FullFieldTomoCallback( lambda **inner_kwargs: link(*full_field_order, **inner_kwargs), publisher, + handler_reg, **kwargs, ) @@ -205,6 +236,7 @@ def run_server( inbound_proxy_address=glbl_dict["inbound_proxy_address"], outbound_prefix=(b"raw", b"an", b"qoi"), inbound_prefix=b"tomo", + _publisher=None, **kwargs, ): """Server for performing tomographic reconstructions @@ -228,10 +260,16 @@ def run_server( """ print(kwargs) - publisher = Publisher(inbound_proxy_address, prefix=inbound_prefix) + db = glbl_dict['exp_db'] + handler_reg = db.reg.handler_reg + if _publisher is None: + publisher = Publisher(inbound_proxy_address, prefix=inbound_prefix) + else: + publisher = _publisher rr = RunRouter( - [lambda x: tomo_callback_factory(x, publisher=publisher, **kwargs)] + [lambda x: tomo_callback_factory(x, publisher=publisher, + handler_reg=handler_reg, **kwargs)] ) d = RemoteDispatcher(outbound_proxy_address, prefix=outbound_prefix) diff --git a/xpdan/startup/viz_server.py b/xpdan/startup/viz_server.py index 1ced2b7..28c691c 100644 --- a/xpdan/startup/viz_server.py +++ b/xpdan/startup/viz_server.py @@ -3,7 +3,7 @@ import numpy as np from bluesky.utils import install_qt_kicker from matplotlib.colors import SymLogNorm -from xpdan.callbacks import Live3DView +from xpdan.mayavi_callbacks import Live3DView from xpdan.vend.callbacks.best_effort import BestEffortCallback from xpdan.vend.callbacks.broker import LiveImage from xpdan.vend.callbacks.core import RunRouter @@ -61,9 +61,9 @@ def run_server( ), x, ), - lambda x: BestEffortCallback(table_enabled=False, overplot=False), - lambda x: LiveWaterfall(), lambda x: Live3DView(), + lambda x: LiveWaterfall(), + lambda x: BestEffortCallback(table_enabled=False, overplot=False), ] ) diff --git a/xpdan/tests/startup/test_servers.py b/xpdan/tests/startup/test_servers.py index 5524bee..e14da9e 100644 --- a/xpdan/tests/startup/test_servers.py +++ b/xpdan/tests/startup/test_servers.py @@ -19,6 +19,7 @@ from xpdan.startup.analysis_server import run_server as analysis_run_server from xpdan.startup.db_server import run_server as db_run_server from xpdan.startup.qoi_server import run_server as qoi_run_server +from xpdan.startup.tomo_server import run_server as tomo_run_server from xpdan.vend.callbacks.core import Retrieve from xpdan.vend.callbacks.zmq import Publisher @@ -225,6 +226,7 @@ def run_exp(delay): # pragma: no cover p = Publisher(proxy[0], prefix=b"raw") RE.subscribe(p) det = SynSignal(func=lambda: np.ones(10), name='gr') + RE(bp.count([det], md=dict(analysis_stage="raw"))) RE(bp.count([det], md=dict(analysis_stage="pdf"))) # Run experiment in another process (after delay) @@ -233,11 +235,131 @@ def run_exp(delay): # pragma: no cover # send the message that will eventually kick us out of the server loop threading.Thread(target=delayed_sigint, args=(10,)).start() + L = [] try: print("running server") - qoi_run_server() + qoi_run_server(_publisher=lambda *x: L.append(x)) except KeyboardInterrupt: print("finished server") exp_proc.terminate() exp_proc.join() + assert L + + +def test_tomo_run_server_2d_pencil(tmpdir, proxy, RE, hw): + def delayed_sigint(delay): # pragma: no cover + time.sleep(delay) + print("killing") + os.kill(os.getpid(), signal.SIGINT) + + def run_exp(delay): # pragma: no cover + time.sleep(delay) + print("running exp") + + p = Publisher(proxy[0], prefix=b"raw") + RE.subscribe(p) + + RE(bp.grid_scan([hw.noisy_det], + hw.motor1, 0, 2, 2, + hw.motor2, 0, 2, 2, True, + md={'tomo': {'type': 'pencil', + 'rotation': 'motor1', + "translation": "motor2", + 'center': 1}})) + + # Run experiment in another process (after delay) + exp_proc = multiprocessing.Process(target=run_exp, args=(2,), daemon=True) + exp_proc.start() + + # send the message that will eventually kick us out of the server loop + threading.Thread(target=delayed_sigint, args=(10,)).start() + L = [] + try: + print("running server") + tomo_run_server(_publisher=lambda *x: L.append(x)) + + except KeyboardInterrupt: + print("finished server") + exp_proc.terminate() + exp_proc.join() + assert L + + +def test_tomo_run_server_3d_pencil(tmpdir, proxy, RE, hw): + def delayed_sigint(delay): # pragma: no cover + time.sleep(delay) + print("killing") + os.kill(os.getpid(), signal.SIGINT) + + def run_exp(delay): # pragma: no cover + time.sleep(delay) + print("running exp") + + p = Publisher(proxy[0], prefix=b"raw") + RE.subscribe(p) + + RE(bp.grid_scan([hw.noisy_det], + hw.motor3, 0, 2, 2, + hw.motor1, 0, 2, 2, True, + hw.motor2, 0, 2, 2, True, + md={'tomo': {'type': 'pencil', + 'rotation': 'motor1', + "translation": "motor2", + "stack": "motor3", + 'center': 1}})) + + # Run experiment in another process (after delay) + exp_proc = multiprocessing.Process(target=run_exp, args=(2,), daemon=True) + exp_proc.start() + + # send the message that will eventually kick us out of the server loop + threading.Thread(target=delayed_sigint, args=(10,)).start() + L = [] + try: + print("running server") + tomo_run_server(_publisher=lambda *x: L.append(x)) + + except KeyboardInterrupt: + print("finished server") + exp_proc.terminate() + exp_proc.join() + assert L + + +def test_tomo_run_server_full_field(tmpdir, proxy, RE, hw): + def delayed_sigint(delay): # pragma: no cover + time.sleep(delay) + print("killing") + os.kill(os.getpid(), signal.SIGINT) + + def run_exp(delay): # pragma: no cover + time.sleep(delay) + print("running exp") + + p = Publisher(proxy[0], prefix=b"raw") + RE.subscribe(p) + + det = SynSignal(func=lambda: np.ones((10, 10)), name='gr') + RE(bp.scan([det], + hw.motor1, 0, 2, 2, + md={'tomo': {'type': 'full_field', + 'rotation': 'motor1', + 'center': 1}})) + + # Run experiment in another process (after delay) + exp_proc = multiprocessing.Process(target=run_exp, args=(2,), daemon=True) + exp_proc.start() + + # send the message that will eventually kick us out of the server loop + threading.Thread(target=delayed_sigint, args=(10,)).start() + L = [] + try: + print("running server") + tomo_run_server(_publisher=lambda *x: L.append(x)) + + except KeyboardInterrupt: + print("finished server") + exp_proc.terminate() + exp_proc.join() + assert L diff --git a/xpdan/vend/callbacks/broker.py b/xpdan/vend/callbacks/broker.py index 215ad0b..a439cfd 100644 --- a/xpdan/vend/callbacks/broker.py +++ b/xpdan/vend/callbacks/broker.py @@ -90,7 +90,8 @@ def __init__( norm=None, limit_func=None, auto_redraw=True, - interpolation=None + interpolation=None, + aspect=None ): super().__init__(handler_reg=handler_reg) self.interpolation = interpolation @@ -100,16 +101,30 @@ def __init__( self.cmap = cmap self.fields = [] self.cs_dict = {} + self.aspect = aspect def descriptor(self, doc): + data_keys = doc["data_keys"] self.fields = [ - k for k, v in doc["data_keys"].items() if len(v["shape"]) == 2 + k for k, v in data_keys.items() if len(v["shape"]) == 2 ] + if self.aspect is None: + aspects = [] + for field in self.fields: + aspect_ratio = data_keys[field]["shape"][0] / data_keys[field]["shape"][1] + if any(s == -1 for s in data_keys[field]["shape"]): + aspects.append('auto') + elif aspect_ratio > 1.25 or aspect_ratio < .75: + aspects.append('auto') + else: + aspects.append('equal') + else: + aspects = [self.aspect] * len(self.fields) from xray_vision.backend.mpl.cross_section_2d import CrossSection import matplotlib.pyplot as plt # only make new figure for new data otherwise use old data - for field in self.fields: + for field, asp in zip(self.fields, aspects): if field in self.cs_dict and plt.fignum_exists( self.cs_dict[field]._fig.number ): @@ -122,6 +137,7 @@ def descriptor(self, doc): self.limit_func, self.auto_redraw, self.interpolation, + aspect=asp ) cs._fig.canvas.set_window_title(field) cs._fig.show() diff --git a/xpdan/vend/callbacks/core.py b/xpdan/vend/callbacks/core.py index 5b36aa3..de28edf 100644 --- a/xpdan/vend/callbacks/core.py +++ b/xpdan/vend/callbacks/core.py @@ -570,9 +570,9 @@ def __init__(self, handler_reg, root_map=None, executor=None): root_map = {} if handler_reg is None: handler_reg = {} - self.resources = None - self.handlers = None - self.datums = None + self.resources = {} + self.handlers = {} + self.datums = {} self.root_map = root_map self.handler_reg = handler_reg