From 6a4a267cb4dde2f3ba578d7722ef5d564458d6d2 Mon Sep 17 00:00:00 2001 From: Thomas D Grant Date: Thu, 2 May 2024 07:30:14 -0700 Subject: [PATCH 1/3] added back swaxs files --- src/om/graphical_interfaces/swaxs_gui.py | 291 ++++++++ src/om/lib/radial_profile.py | 843 +++++++++++++++++++++++ src/om/processing_layer/__init__.py | 1 + 3 files changed, 1135 insertions(+) create mode 100644 src/om/graphical_interfaces/swaxs_gui.py create mode 100644 src/om/lib/radial_profile.py diff --git a/src/om/graphical_interfaces/swaxs_gui.py b/src/om/graphical_interfaces/swaxs_gui.py new file mode 100644 index 00000000..0474d047 --- /dev/null +++ b/src/om/graphical_interfaces/swaxs_gui.py @@ -0,0 +1,291 @@ +# This file is part of OM. +# +# OM is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# OM is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with OM. +# If not, see . +# +# Copyright 2020 SLAC National Accelerator Laboratory +# +# Based on OnDA - Copyright 2014-2019 Deutsches Elektronen-Synchrotron DESY, +# a research centre of the Helmholtz Association. +""" +OM's GUI for Crystallography. + +This module contains the implementation of a graphical interface that displays reduced +and aggregated data in crystallography experiments. +""" + +# TODO: Documentation of this whole file. + +import signal +import sys +import time +from typing import Any, Dict + +import click +import numpy +from numpy.typing import NDArray + +from om.graphical_interfaces.common import OmGuiBase +from om.lib.exceptions import OmMissingDependencyError + +try: + from PyQt5 import QtCore, QtWidgets # type: ignore +except ImportError: + raise OmMissingDependencyError( + "The following required module cannot be imported: PyQt5" + ) + +try: + import pyqtgraph # type: ignore +except ImportError: + raise OmMissingDependencyError( + "The following required module cannot be imported: pyqtgraph" + ) + + +class SwaxsGui(OmGuiBase): + """ + See documentation of the `__init__` function. + + Base class: [`OmGui`][om.graphical_interfaces.base.OmGui] + """ + + def __init__(self, url: str) -> None: + """ + OM graphical user interface for crystallography. + + This class implements a graphical user interface for crystallography + experiments. It is a subclass of the [OmGui] + [om.graphical_interfaces.base.OmGui] base class. + + This GUI receives reduced and aggregated data from an OnDA Monitor for + Crystallography when it is tagged with the 'omdata' label. The data must + contain information about peaks detected in the frames recently processed by + the monitor and information about the current hit rate. + + The GUI displays a plot showing the evolution of the hit rate over time, plus a + virtual powder pattern created using the detected peaks. + + Arguments: + + url (str): the URL at which the GUI will connect and listen for data. This + must be a string in the format used by the ZeroMQ Protocol. + """ + super(SwaxsGui, self).__init__( + url=url, + tag="omdata", + ) + + self._received_data: Dict[str, Any] = {} + + pyqtgraph.setConfigOption("background", 0.2) + + # radial profiles + self._radial_widget: Any = pyqtgraph.PlotWidget() + self._radial_widget.addLegend(offset=(0, 100)) + self._radial_widget.setLabel(axis="left", text="I(q)") + self._radial_widget.setLabel(axis="bottom", text="q (1/angstrom)") + self._radial_plot: Any = self._radial_widget.plot( + tuple(range(1000, 0)), [0.0] * 1000, name="frame" + ) + self._recent_avg_plot: Any = self._radial_widget.plot( + tuple(range(1000, 0)), [0.0] * 1000, pen=pyqtgraph.mkPen("y"), name="recent" + ) + self._cumulative_hits_radial_plot: Any = self._radial_widget.plot( + tuple(range(1000, 0)), [0.0] * 1000, pen=pyqtgraph.mkPen("c"), name="hits only" + ) + + self._hit_rate_widget: Any = pyqtgraph.PlotWidget() + self._hit_rate_widget.addLegend() + self._hit_rate_widget.setTitle("Hit Rate vs. Events") + self._hit_rate_widget.setLabel(axis="bottom", text="Events") + self._hit_rate_widget.setLabel(axis="left", text="Hit Rate") + self._hit_rate_widget.showGrid(x=True, y=True) + self._hit_rate_widget.setYRange(0, 1.0) + + self._hit_rate_plot: Any = self._hit_rate_widget.plot( + pen=None, + symbol="o", + symbolPen=pyqtgraph.mkPen("y"), + symbolSize=3, + name="Hit Rate", + ) + + self._roi_widget: Any = pyqtgraph.PlotWidget() + self._roi_widget.addLegend() + self._roi_widget.setTitle("Intensity of ROI vs. Events") + self._roi_widget.setLabel(axis="bottom", text="Events") + self._roi_widget.setLabel(axis="left", text="ROI Intensity") + self._roi_widget.showGrid(x=True, y=True) + self._roi_widget.setYRange(-1e-3, 1.0e-2) + + self._roi1_plot: Any = self._roi_widget.plot( + tuple(range(-5000, 0)), + [0.0] * 5000, + pen=None, + symbol="o", + symbolPen=pyqtgraph.mkPen("y"), + symbolSize=3, + name="ROI1", + ) + self._roi2_plot: Any = self._roi_widget.plot( + tuple(range(-5000, 0)), + [0.0] * 5000, + pen=None, + symbol="o", + symbolPen=pyqtgraph.mkPen("c"), + symbolSize=3, + name="ROI2", + ) + self._frame_mean_plot: Any = self._roi_widget.plot( + tuple(range(-5000, 0)), + [0.0] * 5000, + pen=None, + symbol="o", + symbolPen=pyqtgraph.mkPen("w"), + symbolSize=3, + name="Frame Mean", + ) + + self._rg_widget: Any = pyqtgraph.PlotWidget() + self._rg_widget.addLegend() + self._rg_widget.setTitle("Particle Size vs. Events") + self._rg_widget.setLabel(axis="bottom", text="Events") + self._rg_widget.setLabel(axis="left", text="Particle Size") + self._rg_widget.showGrid(x=True, y=True) + self._rg_widget.setYRange(0, 1.0e3) + + self._rg_plot: Any = self._rg_widget.plot( + tuple(range(-5000, 0)), + [0.0] * 5000, + pen=None, + symbol="o", + symbolPen=pyqtgraph.mkPen("y"), + symbolSize=3, + name="Particle Size", + ) + + self._radial_stack_view: Any = pyqtgraph.ImageView() + self._radial_stack_view.view.setAspectLocked(False) + self._radial_stack_view.setLevels(0, 12.0) + self._radial_stack_view.ui.histogram.gradient.loadPreset("flame") + + pos = numpy.linspace(0, 1.0, 5) + colors = [(0, 0, 0), (0, 0, 255), (0, 255, 0), (255, 255, 0), (255, 0, 0)] + colormap = pyqtgraph.ColorMap(pos, colors) + self._radial_stack_view.setColorMap(colormap) + + horizontal_layout: Any = QtWidgets.QHBoxLayout() + + splitter_0: Any = QtWidgets.QSplitter() + splitter_0.addWidget(self._radial_stack_view) + + vertical_splitter: Any = QtWidgets.QSplitter(QtCore.Qt.Vertical) + vertical_splitter.addWidget(self._radial_widget) + vertical_splitter.addWidget(self._hit_rate_widget) + vertical_splitter.addWidget(self._roi_widget) + vertical_splitter.addWidget(self._rg_widget) + splitter_0.addWidget(vertical_splitter) + horizontal_layout.addWidget(splitter_0) + self._central_widget: Any = QtWidgets.QWidget() + self._central_widget.setLayout(horizontal_layout) + self.setCentralWidget(self._central_widget) + self.show() + + QtWidgets.QApplication.processEvents() + + def update_gui(self) -> None: + """ + Updates the elements of the Swaxs GUI. + + This method overrides the corresponding method of the base class: please also + refer to the documentation of that class for more information. + + This function stores the data received from OM, and calls the internal + functions that update the hit rate history plot and the virtual power pattern. + """ + if self._received_data: + # Resets the 'received_data' attribute to None. One can then check if + # data has been received simply by checking wether the attribute is not + # None. + local_data: Dict[str, Any] = self._received_data + self._received_data = {} + else: + # If no data has been received, returns without drawing anything. + return + + # self._last_pixel_size: float = local_data["pixel_size"] + # self._last_detector_distance: float = local_data["detector_distance"] + # self._last_beam_energy: float = local_data["beam_energy"] + # self._last_coffset: float = local_data["first_panel_coffset"] + + radial: NDArray[numpy.float_] = local_data["radial_profile"] + q: NDArray[numpy.float_] = local_data["q"] + self._radial_plot.setData(q, radial) + + recent_avg: NDArray[numpy.float_] = local_data["recent_radial_average"] + self._recent_avg_plot.setData(q, recent_avg) + + cumulative_hits_radial: NDArray[numpy.float_] = local_data["cumulative_hits_radial"] + self._cumulative_hits_radial_plot.setData(q, cumulative_hits_radial) + + self._radial_stack_view.setImage( + local_data["radial_stack"].T, + autoHistogramRange=False, + autoLevels=False, + autoRange=False, + ) + + self._hit_rate_plot.setData( + tuple(range(-5000, 0)), local_data["hit_rate_history"] + ) + + self._roi1_plot.setData(tuple(range(-5000, 0)), local_data["roi1_int_history"]) + self._roi2_plot.setData(tuple(range(-5000, 0)), local_data["roi2_int_history"]) + self._frame_mean_plot.setData(tuple(range(-5000, 0)), local_data["image_sum_history"]) + + self._rg_plot.setData(tuple(range(-5000, 0)), local_data["rg_history"]) + + QtWidgets.QApplication.processEvents() + + # Computes the estimated age of the received data and prints it into the status + # bar (a GUI is supposed to be a Qt MainWindow widget, so it is supposed to + # have a status bar). + time_now: float = time.time() + estimated_delay: float = round(time_now - local_data["timestamp"], 6) + self.statusBar().showMessage(f"Estimated delay: {estimated_delay} seconds") + + +@click.command() +@click.argument("url", type=str, required=False) +def main(url: str) -> None: + """ + OM Graphical User Interface for Crystallography. This program must connect to a + running OnDA Monitor for Crystallography. If the monitor broadcasts the necessary + information, this GUI will display the evolution of the hit rate over time, plus a + real-time virtual powder pattern created using the peaks detected in detector + frames processed by the monitor. + + The GUI connects to and OnDA Monitor running at the IP address (or hostname) + specified by the URL string. This is a string in the format used by the ZeroMQ + Protocol. The URL string is optional. If not provided, it defaults to + "tcp://127.0.0.1:12321" and the viewer connects, using the tcp protocol, to a + monitor running on the local machine at port 12321. + """ + # This function is turned into a script by the Click library. The docstring + # above becomes the help string for the script. + signal.signal(signal.SIGINT, signal.SIG_DFL) + + if url is None: + url = "tcp://127.0.0.1:12321" + app: Any = QtWidgets.QApplication(sys.argv) + _ = SwaxsGui(url) + sys.exit(app.exec_()) diff --git a/src/om/lib/radial_profile.py b/src/om/lib/radial_profile.py new file mode 100644 index 00000000..723dccd8 --- /dev/null +++ b/src/om/lib/radial_profile.py @@ -0,0 +1,843 @@ +# This file is part of OM. +# +# OM is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# OM is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with OM. +# If not, see . +# +# Copyright 2020 -2023 SLAC National Accelerator Laboratory +# +# Based on OnDA - Copyright 2014-2019 Deutsches Elektronen-Synchrotron DESY, +# a research centre of the Helmholtz Association. +""" +Radial average algorithms. + +This module contains classes and functions that perform common data processing +operations on radial profile information computed from detector data frames. +""" + +from collections import deque +from typing import Any, Deque, Dict, Tuple, Union, cast + +import numpy +from numpy.typing import NDArray +from scipy import constants, stats # type: ignore + +from om.algorithms.generic import RadialProfile +from om.lib.geometry import GeometryInformation +from om.lib.hdf5 import parse_parameters_and_load_hdf5_data +from om.lib.parameters import get_parameter_from_parameter_group + + +def _fit_by_least_squares( + *, + radial_profile: NDArray[numpy.float_], + vectors: NDArray[numpy.float_], + start_bin: Union[int, None] = None, + stop_bin: Union[int, None] = None, +) -> NDArray[numpy.float_]: + # This function fits a set of linearly combined vectors to a radial profile, + # using a least-squares-based approach. The fit only takes into account the + # range of radial bins defined by the xmin and xmax arguments. + if start_bin is None: + start_bin = 0 + if stop_bin is None: + stop_bin = len(radial_profile) + a: NDArray[numpy.float_] = numpy.nan_to_num(numpy.atleast_2d(vectors).T) + b: NDArray[numpy.float_] = numpy.nan_to_num(radial_profile) + a = a[start_bin:stop_bin] + b = b[start_bin:stop_bin] + coefficients: NDArray[numpy.float_] + coefficients, _, _, _ = numpy.linalg.lstsq(a, b, rcond=None) + return coefficients + + +def _cumulative_moving_average(new_radial, previous_cumulative_avg, num_events): + return ((previous_cumulative_avg * num_events) + new_radial) / (num_events + 1) + + +def _calc_rg_by_guinier( + q, + radial, + nb=None, + ne=None, +) -> float: + # Calculates Rg by fitting Guinier equation to data. + # Uses only desired q range in input arrays. + if nb is None: + nb = 0 + if ne is None: + ne = len(q) + i: int = 0 + while True: + try: + m: float + m, _ = stats.linregress(q[nb:ne] ** 2, numpy.log(radial[nb:ne]))[:2] + except: # noqa: E722 + m = 0.0 + if m < 0.0: + break + else: + # the slope should be negative + # if the slope is positive, shift the region + # forward by one point and try again + nb += 5 + ne += 5 + i += 1 + if i > 10: + # try ten times shifting, then give up + m = 0.0 + break + rg: float = (-3 * m) ** (0.5) + return rg + + +def _calc_rg_by_guinier_peak( + q, + radial, + exp=1, + nb=None, + ne=None, +) -> float: + # Roughly estimate Rg using the Guinier peak method. + # Uses only desired q range in input arrays. + # (exp is the exponent in q^exp * I(q)) + d: int = exp + if ne is None: + ne = len(q) + qs: NDArray[numpy.float_] = q[nb:ne] + Is: NDArray[numpy.float_] = radial[nb:ne] + qdI: NDArray[numpy.float_] = qs**d * Is + try: + # fit a quick quadratic for smoothness, ax^2 + bx + c + a: float + b: float + a, b, _ = numpy.polyfit(qs, qdI, 2) + # get the peak position + qpeak: float = -b / (2 * a) + except: # noqa: E722 + # if polyfit fails, just grab the maximum position + qpeaki: int = numpy.argmax(qdI) + qpeak = qs[qpeaki] + # calculate Rg from the peak position + rg: float = (3.0 * d / 2.0) ** 0.5 / qpeak + return rg + + +def _sphere_form_factor(radius, q_mags, check_divide_by_zero=True): + # By Rick Kirian and Joe Chen + # Copied from reborn.simulate.form_factors with permission. + # Form factor :math:`f(q)` for a sphere of radius :math:`r`, at given :math:`q` + # magnitudes. The formula is + # + # f(q) = 4 * pi * (sin(qr) - qr * cos(qr)) / q^3 + # + # When q = 0, the following limit is used: + # + # f(0) = 4 / 3 * pi * r^3 + # + # The formula can be found, for example, in Table A.1 of |Guinier|. There are no + # approximations in this formula beyond the 1st Born approximation; it is not a + # small-angle formula. + # + # Note that you need to multiply this by the electron density of the sphere if you + # want reasonable amplitudes. + # E.g., water molecules have 10 electrons, a molecular weight of 18 g/mol and a + # density of 1 g/ml, so you can google search the electron density of water, which + # is 10*(1 g/cm^3)/(18 g/6.022e23) = 3.346e29 per m^3 . + qr = q_mags * radius + if check_divide_by_zero is True: + amp = numpy.zeros_like(qr) + amp[qr == 0] = (4 * numpy.pi * radius**3) / 3 + w = qr != 0 + amp[w] = ( + 4 + * numpy.pi + * radius**3 + * (numpy.sin(qr[w]) - qr[w] * numpy.cos(qr[w])) + / qr[w] ** 3 + ) + else: + amp = ( + 4 * numpy.pi * radius**3 * (numpy.sin(qr) - qr * numpy.cos(qr)) / qr**3 + ) + return amp + + +class _SphericalDroplets: + # By Rick Kirian and Joe Chen + # Copied from reborn.analysis.optimize with permission. + def __init__(self, q=None, r=None): + if q is None: + q = numpy.linspace(0, 1e10, 517) + if r is None: + r = numpy.linspace( + 50, 3000, 20 + ) # set of spherical radii to test in angstroms + self.q = q.copy() + self.r = r.copy() # radius range of sphere to scan through + + self.N = len(self.r) + self.I_R_precompute = numpy.zeros((self.N, len(self.q))) + for i in range(self.N): + self.I_R_precompute[i, :] = ( + _sphere_form_factor( + radius=self.r[i], q_mags=self.q, check_divide_by_zero=True + ) + ) ** 2 + + def fit_profile(self, I_D, mask=None): + if mask is None: + mask = numpy.ones_like(I_D) + + w = mask > 0 + + A_save = numpy.zeros(self.N) + error_vec = numpy.zeros(self.N) + for i in range(self.N): + I_R = self.I_R_precompute[i, :] + A = numpy.sum(I_D[w] * I_R[w]) / numpy.sum(I_R[w] ** 2) + diff_sq = (A * I_R[w] - I_D[w]) ** 2 + error_vec[i] = numpy.sum(diff_sq) + A_save[i] = A + + ind_min = numpy.argmin(error_vec) + + A_min = A_save[ind_min] + r_min = self.r[ind_min] + e_min = error_vec[ind_min] + I_R_min = self.I_R_precompute[ind_min, :] + + r_dic = dict( + A_min=A_min, e_min=e_min, error_vec=error_vec, I_R_min=I_R_min.copy() + ) + + return r_min, r_dic + + +class RadialProfileAnalysis: + """ + See documentation of the '__init__' function. + """ + + def __init__( + self, + *, + geometry_information: GeometryInformation, + radial_parameters: Dict[str, Any], + ) -> None: + """ + Radial profile analysis. + + This class stores all the information required to compute radial profiles + from detector data frames, and to analyze them. The analysis performed by this + class includes optional subtraction of a background profile, detection of a + sample droplet (by comparing the intensity of two specific regions of the + radial profile), and estimation of the size of the sample droplet. + + After the class has been initialized, it can be invoked to compute the radial + profile of a data frame, and to analyze it. + + Arguments: + + radial_parameters: A set of OM configuration parameters collected together + in a parameter group. The parameter group must contain the following + entries: + + * `background_subtraction`: Whether a background profile should be + subtracted from the radial profile being analyzed. if the value of + this parameter is true, a set of vectors describing the background + profile are fitted to the profile being analyzed, within a specified + region of the radial profile. The background profile is then + subtracted, and all subsequent analysis steps are performed on the + background-subtracted radial profile. Defaults to False. + + * `background_profile_filename`: The relative or absolute path to an + HDF5 containing vectors that describe a background profile. + + - The vectors must be saved in the file in the format of a 2D array, + with each row storing a single 1D vector. + + If the value of the `background_subtraction` parameter is True, this + parameter must be provided and cannot be None. + + * `background_profile_hdf5_path`: The internal HDF5 path to the data + block storing information about the background profile. + + If the value of the `background_subtraction` parameter is True, this + parameter must be provided and cannot be None. + + * `background_subtraction_min_fit_bin`: The start radial bin for the + region where the background profile is fitted to radial profile being + analyzed. + + If the value of the `background_subtraction` parameter is True, this + parameter must be provided and cannot be None. + + * `background_subtraction_max_fit_bin`: The end radial bin for the + region where the background profile is fitted to radial profile being + analyzed. + + If the value of the `background_subtraction` parameter is True, this + parameter must be provided and cannot be None. + + * `sample_detection`: Whether sample droplet detection should be part + of the analysis carried out by this class. Defaults to False. + + * `total_intensity_jet_threshold`: An intensity threshold used to + determine if a liquid jet is present in the area irradiated by the + beam. If the total intensity recorded in the detector data frame is + below this threshold, no jet, and therefore no sample is assumed to + be present in the detector frame. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + + * `roi1_qmin`: Start, in q coordinates, of the first region of interest + in the radial profile for the detection of a sample droplet. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + + * `roi1_qmax`: End, in q coordinates, of the first region of interest + in the radial profile for the detection of a sample droplet. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + + * `roi2_qmin`: Start, in q coordinates, of the second region of + interest in the radial profile for the detection of a sample droplet. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + + * `roi2_qmax`: End, in q coordinates, of the second region of interest + in the radial profile for the detection of a sample droplet. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + + * `"minimum_roi1_to_roi2_intensity_ratio_for_sample`: minimum ratio + the intensities recorded in the first and second regions of interest + must have for a sample droplet to be detected in the detector frame. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + + * `"maximum_roi1_to_roi2_intensity_ratio_for_sample`: maximum ratio + the intensities recorded in the first and second regions of interest + must have for a sample droplet to be detected in the detector frame. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + + * `estimate_particle_size`: Whether the analysis should include an + estimation of the sample droplet size, if a droplet was detected in + the detector frame. Defaults to False. + + * `size_estimation_method`: The strategy that should be used to + estimate the size of a sample droplet. This parameter can currently + have one of the following values: + + - `guinier`: The droplet size is estimated using the Guinier method. + + - `peak`: The droplet size is estimated using the Guinier peak + method. + + - `sphere`: The droplet size is estimated using the Sphere method + from the Reborn software package. + + If the value of the `sample_detection` parameter is True, this + parameter must be provided and cannot be None. + """ + + self._background_subtraction: bool = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="background_subtraction", + parameter_type=bool, + required=True, + ) + + if self._background_subtraction: + self._background_profile_vectors: NDArray[numpy.float_] = cast( + NDArray[numpy.float_], + parse_parameters_and_load_hdf5_data( + parameters=radial_parameters, + hdf5_filename_parameter="background_profile_filename", + hdf5_path_parameter="background_profile_hdf5_path", + ), + ) + + self._background_subtraction_min_bin: Union[ + int, None + ] = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="background_subtraction_min_fit_bin", + parameter_type=int, + ) + + self._background_subtraction_max_bin: Union[ + int, None + ] = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="background_subtraction_max_fit_bin", + parameter_type=int, + ) + + # Sample detection + + self._sample_detection: bool = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="sample_detection", + parameter_type=bool, + default=False, + ) + + if self._sample_detection: + self._total_intensity_jet_threshold: float = ( + get_parameter_from_parameter_group( + group=radial_parameters, + parameter="total_intensity_jet_threshold", + parameter_type=float, + required=True, + ) + ) + self._roi1_qmin: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="roi1_qmin", + parameter_type=float, + required=True, + ) + self._roi1_qmax: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="roi1_qmax", + parameter_type=float, + required=True, + ) + self._roi2_qmin: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="roi2_qmin", + parameter_type=float, + required=True, + ) + self._roi2_qmax: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="roi2_qmax", + parameter_type=float, + required=True, + ) + + self._ratio_threshold_min: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="minimum_roi1_to_roi2_intensity_ratio_for_sample", + parameter_type=float, + required=True, + ) + self._ratio_threshold_max: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="maximum_roi1_to_roi2_intensity_ratio_for_sample", + parameter_type=float, + required=True, + ) + + self._estimate_particle_size: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="estimate_particle_size", + parameter_type=bool, + required=True, + ) + + if self._estimate_particle_size: + self._size_estimation_method: str = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="size_estimation_method", + parameter_type=str, + default="guinier", + ) + + self._coffset = geometry_information.get_detector_distance_offset() + self._pixel_size = geometry_information.get_pixel_size() + + self._radial_profile = RadialProfile( + radius_pixel_map=geometry_information.get_pixel_maps()["radius"], + radial_parameters=radial_parameters, + ) + + self._radial_bin_labels = self._radial_profile.get_radial_bin_labels() + self._radial_bin_centers = self._radial_profile.calculate_profile( + data=self._radial_bin_labels + ) + + self._radial_profile_bad_pixel_map: Union[ + NDArray[numpy.bool_], None + ] = self._radial_profile.get_bad_pixel_map() + + # initialize spherical droplets + self._spherical_droplets: Union[_SphericalDroplets, None] = None + + def analyze_radial_profile( + self, + *, + data: Union[NDArray[numpy.float_], NDArray[numpy.int_]], + beam_energy: float, + detector_distance: float, + downstream_intensity: float, + ) -> Tuple[ + NDArray[numpy.float_], + NDArray[numpy.float_], + NDArray[numpy.float_], + bool, + float, + float, + float, + ]: + """ + Calculate and analyze a radial profile from a detector data frame. + + This function calculates and analyzes the radial profile of the provided + detector data frame. + + + Arguments: + + data: the detector data frame for which the radial profile must be computed + and analyzed. + + Returns: + + A radial profile whose value is the average radial intensity calculated + from the data frame. + """ + + radial_profile: NDArray[numpy.float_] = self._radial_profile.calculate_profile( + data=data + ) + + errors: NDArray[numpy.float_] + errors, _, _ = stats.binned_statistic( + self._radial_bin_labels[self._radial_profile_bad_pixel_map].ravel(), + data[self._radial_profile_bad_pixel_map].ravel(), + "std", + ) + + if self._background_subtraction: + coefficients = _fit_by_least_squares( + radial_profile=radial_profile, + vectors=self._background_profile_vectors, + start_bin=self._background_subtraction_min_bin, + stop_bin=self._background_subtraction_max_bin, + ) + background_fit: NDArray[numpy.float_] = radial_profile * 0 + index: int + for index in range(len(coefficients)): + background_fit += ( + coefficients[index] * self._background_profile_vectors[index] + ) + + radial_profile = radial_profile - background_fit + + wavelength: float = ( + constants.c * constants.h / (beam_energy * constants.electron_volt) + ) + real_detector_distance: float = detector_distance * 1e-3 + self._coffset + theta: NDArray[numpy.float_] = ( + numpy.arctan( + self._pixel_size * self._radial_bin_centers / real_detector_distance + ) + * 0.5 + ) + q: NDArray[numpy.float_] = ( + numpy.sin(theta) * 4 * numpy.pi / wavelength + ) * 1e-10 + + # grab the intensities from the regions of interest, e.g. water ring and low q + # ring normalize by the downstream monitor + roi1_intensity: float = ( + numpy.mean( + radial_profile[ + numpy.where((q >= self._roi1_qmin) & (q <= self._roi1_qmax)) + ] + ) + # / downstream_intensity + ) + roi2_intensity: float = ( + numpy.mean( + radial_profile[ + numpy.where((q >= self._roi2_qmin) & (q <= self._roi2_qmax)) + ] + ) + # / downstream_intensity + ) + + frame_sum = data.mean() + frame_has_jet: bool = frame_sum > self._total_intensity_jet_threshold + if frame_has_jet: + if self._sample_detection: + first_to_second_peak_ratio = float(roi1_intensity / roi2_intensity) + sample_detected: bool = ( + # Having a threshold maximum helps filtering out nozzle hits too + (first_to_second_peak_ratio > self._ratio_threshold_min) + and (first_to_second_peak_ratio < self._ratio_threshold_max) + ) + else: + sample_detected = False + + if self._estimate_particle_size: + q_index: NDArray[numpy.int_] = numpy.where( + (q >= self._guinier_qmin) & (q <= self._guinier_qmax) + ) + if len(q_index[0]) != 0: + q_min_index: numpy.int_ = numpy.min(q_index) + q_max_index: numpy.int_ = numpy.max(q_index) + if self._size_estimation_method == "sphere": + # try to estimate radius using spherical droplets + if self._spherical_droplets is None: + # For the first frame, instantiate the class with the now + # known q values. + # Note: this assumes q does not change frame to frame. + self._spherical_droplets = _SphericalDroplets(q=q) + # Can add r=set of radii later + # Note: this is r, radius, not rg + r, _ = self._spherical_droplets.fit_profile( + I_D=radial_profile, mask=None + ) + rg = r + # for simplicity since rg is returned below for guinier + # stuff. + elif self._size_estimation_method == "peak": + # try to estimate Rg using Guinier Peak method + rg: float = _calc_rg_by_guinier_peak( + q, radial_profile, nb=q_min_index, ne=q_max_index + ) + else: + # try to estimate Rg using standard Guinier plot + rg = _calc_rg_by_guinier( + q, radial_profile, nb=q_min_index, ne=q_max_index + ) + else: + rg = 0.0 + else: + rg = 0.0 + else: + return ( + radial_profile, + errors, + q, + False, + roi1_intensity, + roi2_intensity, + 0.6, + frame_sum, + ) + return ( + radial_profile, + errors, + q, + sample_detected, + roi1_intensity, + roi2_intensity, + rg, + frame_sum, + ) + + +class RadialProfileAnalysisPlots: + """ + See documentation for the `__init__` function. + """ + + def __init__( + self, + *, + radial_parameters: Dict[str, Any], + ) -> None: + """ + #TODO: Documentation. + """ + self._radius_bin_size: bool = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="radius_bin_size", + parameter_type=float, + ) + + self._running_average_window_size: int = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="running_average_window_size", + parameter_type=int, + required=True, + ) + + self._num_radials_to_send: int = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="num_radials_to_send", + parameter_type=int, + required=True, + ) + + self._num_hits_in_cum_radial_avg: int = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="num_hits_in_cum_radial_avg", + parameter_type=int, + ) + + self._num_events_to_plot: int = 5000 + + self._hit_rate_running_window: Deque[float] = deque( + [0.0] * self._running_average_window_size, + maxlen=self._running_average_window_size, + ) + self._avg_hit_rate: int = 0 + self._num_hits: int = 0 + self._hit_rate_timestamp_history: Deque[float] = deque( + self._num_events_to_plot * [0.0], maxlen=self._num_events_to_plot + ) + # self._hit_rate_history: Deque[float] = deque(5000 * [0.0], maxlen=5000) + + self._hit_rate_history: Union[Deque[NDArray[numpy.float_]], None] = None + self._q_history: Union[Deque[NDArray[numpy.float_]], None] = None + self._radials_history: Union[Deque[NDArray[numpy.float_]], None] = None + self._image_sum_history: Union[Deque[float], None] = None + self._downstream_intensity_history: Union[Deque[float], None] = None + self._roi1_intensity_history: Union[Deque[float], None] = None + self._roi2_intensity_history: Union[Deque[float], None] = None + self._rg_history: Union[Deque[float], None] = None + self._cumulative_hits_radial = None + + def update_plots( + self, + *, + radial_profile: NDArray[numpy.float_], + detector_data_sum: float, + q: NDArray[numpy.float_], + downstream_intensity: float, + roi1_intensity: float, + roi2_intensity: float, + sample_detected: bool, + rg: float, + ) -> Tuple[ + Deque[NDArray[numpy.float_]], + Deque[NDArray[numpy.float_]], + Deque[float], + Deque[float], + Deque[float], + Deque[float], + Deque[bool], + Deque[float], + ]: + """ + #TODO: Documentation. + """ + + if self._hit_rate_history is None: + num_radial_bins: int = len(radial_profile) + + self._hit_rate_history = deque( + [False] * self._num_events_to_plot, + maxlen=self._num_events_to_plot, + ) + self._hit_rate_running_window: Deque[float] = deque( + [0.0] * self._running_average_window_size, + maxlen=self._running_average_window_size, + ) + self._q_history = deque( + [numpy.zeros(num_radial_bins)] * self._num_radials_to_send, + maxlen=self._num_radials_to_send, + ) + self._radials_history = deque( + [numpy.zeros(num_radial_bins)] * self._num_radials_to_send, + maxlen=self._num_radials_to_send, + ) + self._image_sum_history = deque( + [0.0] * self._num_events_to_plot, + maxlen=self._num_events_to_plot, + ) + self._downstream_intensity_history = deque( + [0.0] * self._num_events_to_plot, + maxlen=self._num_events_to_plot, + ) + self._roi1_intensity_history = deque( + [0.0] * self._num_events_to_plot, + maxlen=self._num_events_to_plot, + ) + self._roi2_intensity_history = deque( + [0.0] * self._num_events_to_plot, + maxlen=self._num_events_to_plot, + ) + self._rg_history = deque( + [0.0] * self._num_events_to_plot, + maxlen=self._num_events_to_plot, + ) + # for the first event, the cumulative radial will just be the radial of that first event + self._cumulative_hits_radial = radial_profile + + self._hit_rate_running_window.append(float(sample_detected)) + avg_hit_rate = ( + sum(self._hit_rate_running_window) / self._running_average_window_size + ) + # self._hit_rate_timestamp_history.append(timestamp) + self._hit_rate_history.append(avg_hit_rate * 100.0) + + self._q_history.append(q) + self._radials_history.append(radial_profile) + self._image_sum_history.append(detector_data_sum) + self._downstream_intensity_history.append(downstream_intensity) + self._roi1_intensity_history.append(roi1_intensity) + self._roi2_intensity_history.append(roi2_intensity) + # self._hit_rate_history.append(sample_detected) + self._rg_history.append(rg) + # only add to cumulative radial if a hit, i.e. sample detected + if sample_detected: + self._num_hits += 1 + if self._num_hits > self._num_hits_in_cum_radial_avg: + # reset cumulative hits radial every N hits + self._cumulative_hits_radial = radial_profile + else: + self._cumulative_hits_radial = cumulative_moving_average( + new_radial=radial_profile, + previous_cumulative_avg=self._cumulative_hits_radial, + num_events=self._num_hits, + ) + + return ( + self._q_history, + self._radials_history, + self._image_sum_history, + self._downstream_intensity_history, + self._roi1_intensity_history, + self._roi2_intensity_history, + self._hit_rate_history, + self._rg_history, + self._cumulative_hits_radial, + ) + + def clear_plots(self) -> None: + """ + # TODO: Add documentation. + """ + # self._hit_rate_history = deque([], maxlen=self._running_average_window_size) + self._hit_rate_running_window = deque( + [0.0] * self._running_average_window_size, + maxlen=self._running_average_window_size, + ) + self._avg_hit_rate = 0 + self._num_hits = 0 + self._hit_rate_timestamp_history = deque( + self._num_events_to_plot * [0.0], maxlen=self._num_events_to_plot + ) + self._hit_rate_history = deque( + self._num_events_to_plot * [0.0], maxlen=self._num_events_to_plot + ) + + self._q_history = deque([], maxlen=self._num_radials_to_send) + self._radials_history = deque([], maxlen=self._num_radials_to_send) + self._image_sum_history = deque([], maxlen=self._num_events_to_plot) + self._downstream_intensity_history = deque([], maxlen=self._num_events_to_plot) + self._roi1_intensity_history = deque([], maxlen=self._num_events_to_plot) + self._roi2_intensity_history = deque([], maxlen=self._num_events_to_plot) + self._rg_history = deque([], maxlen=self._num_events_to_plot) diff --git a/src/om/processing_layer/__init__.py b/src/om/processing_layer/__init__.py index 417967f3..dfe5e2dd 100644 --- a/src/om/processing_layer/__init__.py +++ b/src/om/processing_layer/__init__.py @@ -27,3 +27,4 @@ from .crystallography import CrystallographyProcessing # noqa: F401 from .testing import TestProcessing # noqa: F401 from .xes import XesProcessing # noqa: F401 +from .swaxs import SwaxsProcessing \ No newline at end of file From 27eef5ac072297117ef7525c36b05170a1e3d4fc Mon Sep 17 00:00:00 2001 From: Thomas D Grant Date: Thu, 2 May 2024 11:01:20 -0700 Subject: [PATCH 2/3] fixed missing guinier_qmin bugs and removed post_sample_intensity temporarily --- src/om/lib/radial_profile.py | 16 +++++++++++++++- src/om/processing_layer/swaxs.py | 8 ++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/om/lib/radial_profile.py b/src/om/lib/radial_profile.py index 723dccd8..bf72fdb1 100644 --- a/src/om/lib/radial_profile.py +++ b/src/om/lib/radial_profile.py @@ -461,6 +461,20 @@ class includes optional subtraction of a background profile, detection of a default="guinier", ) + self._guinier_qmin: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="roi2_qmin", + parameter_type=float, + default=0.0, + ) + self._guinier_qmax: float = get_parameter_from_parameter_group( + group=radial_parameters, + parameter="guinier_qmax", + parameter_type=float, + default=0.1, + ) + + self._coffset = geometry_information.get_detector_distance_offset() self._pixel_size = geometry_information.get_pixel_size() @@ -798,7 +812,7 @@ def update_plots( # reset cumulative hits radial every N hits self._cumulative_hits_radial = radial_profile else: - self._cumulative_hits_radial = cumulative_moving_average( + self._cumulative_hits_radial = _cumulative_moving_average( new_radial=radial_profile, previous_cumulative_avg=self._cumulative_hits_radial, num_events=self._num_hits, diff --git a/src/om/processing_layer/swaxs.py b/src/om/processing_layer/swaxs.py index 188b662f..5aecfa70 100644 --- a/src/om/processing_layer/swaxs.py +++ b/src/om/processing_layer/swaxs.py @@ -257,13 +257,13 @@ def process_data( data=data["detector_data"], beam_energy=data["beam_energy"], detector_distance=data["detector_distance"], - downstream_intensity=data["post_sample_intensity"], + downstream_intensity=1.0 #data["post_sample_intensity"], ) processed_data["radial_profile"] = radial_profile processed_data["detector_data_sum"] = detector_data_sum processed_data["q"] = q - processed_data["downstream_intensity"] = data["post_sample_intensity"] + processed_data["downstream_intensity"] = 1.0 #data["post_sample_intensity"] processed_data["roi1_intensity"] = roi1_intensity processed_data["roi2_intensity"] = roi2_intensity processed_data["sample_detected"] = sample_detected @@ -698,13 +698,13 @@ def process_data( data=data["detector_data"], beam_energy=data["beam_energy"], detector_distance=data["detector_distance"], - downstream_intensity=data["post_sample_intensity"], + downstream_intensity=1.0 #data["post_sample_intensity"], ) processed_data["radial_profile"] = radial_profile processed_data["detector_data_sum"] = detector_data_sum processed_data["q"] = q - processed_data["downstream_intensity"] = data["post_sample_intensity"] + processed_data["downstream_intensity"] = 1.0 #data["post_sample_intensity"] processed_data["roi1_intensity"] = roi1_intensity processed_data["roi2_intensity"] = roi2_intensity processed_data["sample_detected"] = sample_detected From 65aed05d5cc4d1572c488970d73adeb8b65ea1d6 Mon Sep 17 00:00:00 2001 From: Thomas D Grant Date: Fri, 3 May 2024 14:59:33 -0700 Subject: [PATCH 3/3] fixed several bugs. now runs with jungfrau and epix10ka_1 --- src/om/algorithms/generic.py | 3 + src/om/data_retrieval_layer/__init__.py | 2 + .../data_event_handlers_psana.py | 2 +- .../data_retrieval_psana.py | 202 +++++++- .../data_sources_psana.py | 91 ++++ src/om/lib/cheetah.py | 448 ++++++++++++++++++ src/om/processing_layer/__init__.py | 3 +- src/om/processing_layer/swaxs.py | 35 +- 8 files changed, 770 insertions(+), 16 deletions(-) diff --git a/src/om/algorithms/generic.py b/src/om/algorithms/generic.py index d5cad3e9..ad3e6206 100644 --- a/src/om/algorithms/generic.py +++ b/src/om/algorithms/generic.py @@ -148,6 +148,9 @@ def get_radial_bin_labels(self) -> NDArray[numpy.int_]: """ return self._radial_bin_labels + def get_num_radial_bins(self): + return self._num_bins + def get_bad_pixel_map(self) -> Union[NDArray[numpy.bool_], None]: """ Gets the bad pixel map provided to the algorithm. diff --git a/src/om/data_retrieval_layer/__init__.py b/src/om/data_retrieval_layer/__init__.py index fe53b394..1fd1acf0 100644 --- a/src/om/data_retrieval_layer/__init__.py +++ b/src/om/data_retrieval_layer/__init__.py @@ -44,6 +44,8 @@ from .data_retrieval_psana import ( # noqa: F401 CxiLclsCspadDataRetrieval, CxiLclsDataRetrieval, + CxiLclsEpix10kaDataRetrieval, + CxiLclsEpix10kaSinglePanelDataRetrieval, LclsEpix100DataRetrieval, MfxLclsDataRetrieval, MfxLclsRayonixDataRetrieval, diff --git a/src/om/data_retrieval_layer/data_event_handlers_psana.py b/src/om/data_retrieval_layer/data_event_handlers_psana.py index f10beed1..83cd6e7c 100644 --- a/src/om/data_retrieval_layer/data_event_handlers_psana.py +++ b/src/om/data_retrieval_layer/data_event_handlers_psana.py @@ -56,7 +56,7 @@ def _psana_offline_event_generator( # processing node is assigned the residual events. run: Any for run in psana_source.runs(): - times: Any = run.times() + times: Any = run.times()#[:100] num_events_curr_node: int = int( numpy.ceil(len(times) / float(mpi_pool_size - 1)) ) diff --git a/src/om/data_retrieval_layer/data_retrieval_psana.py b/src/om/data_retrieval_layer/data_retrieval_psana.py index b41deb6d..c38baa4b 100644 --- a/src/om/data_retrieval_layer/data_retrieval_psana.py +++ b/src/om/data_retrieval_layer/data_retrieval_psana.py @@ -31,6 +31,7 @@ DiodeTotalIntensityPsana, EpicsVariablePsana, Epix10kaPsana, + Epix10kaSinglePanelPsana, Epix100Psana, EventIdPsana, EvrCodesPsana, @@ -199,10 +200,109 @@ def __init__(self, *, monitor_parameters: MonitorParameters, source: str): data_source_name="detector_distance", monitor_parameters=monitor_parameters, ), - "detector_distance": FloatEntryFromConfiguration( - data_source_name="fallback_detector_distance_in_mm", + # "detector_distance": FloatEntryFromConfiguration( + # data_source_name="fallback_detector_distance_in_mm", + # monitor_parameters=monitor_parameters, + # ), + "timetool_data": EpicsVariablePsana( + data_source_name="timetool", monitor_parameters=monitor_parameters + ), + "optical_laser_active": EvrCodesPsana( + data_source_name="active_optical_laser", + monitor_parameters=monitor_parameters, + ), + "xrays_active": EvrCodesPsana( + data_source_name="active_xrays", + monitor_parameters=monitor_parameters, + ), + "post_sample_intensity": DiodeTotalIntensityPsana( + data_source_name="post_sample_intensity", + monitor_parameters=monitor_parameters, + ), + "lcls_extra": LclsExtraPsana( + data_source_name="lcls_extra", + monitor_parameters=monitor_parameters, + ), + } + + self._data_event_handler: OmDataEventHandlerProtocol = PsanaDataEventHandler( + source=source, + monitor_parameters=monitor_parameters, + data_sources=data_sources, + ) + + def get_data_event_handler(self) -> OmDataEventHandlerProtocol: + """ + Retrieves the Data Event Handler used by the Data Retrieval class. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Returns: + + The Data Event Handler used by the Data Retrieval class. + """ + return self._data_event_handler + +class CxiLclsEpix10kaDataRetrieval(OmDataRetrievalProtocol): + """ + See documentation of the `__init__` function. + """ + + def __init__(self, *, monitor_parameters: MonitorParameters, source: str): + """ + Data Retrieval from psana at the CXI beamline of the LCLS facility (CSPAD). + + This class implements OM's Data Retrieval Layer for the CXI beamline of the + LCLS facility, using the CSPAD x-ray detector. This detector was used at the + beamline until early 2020. + + This class implements the interface described by its base Protocol class. + Please see the documentation of that class for additional information about + the interface. + + * This class considers an individual data event as equivalent to the content of + a psana event, which stores data related to a single detector frame. + + * A string combining psana's timestamp and fiducial information, with the + following format: + `{timestamp: seconds}-{timestamp: nanoseconds}-{fiducials}`, is used as event + identifier. + + * Psana provides timestamp, beam energy and detector distance data for each + event, retrieved from various sensors in the system. + + * The source string required by this Data Retrieval class is a string of the + type used by psana to identify specific runs, experiments, or live data + streams. + + Arguments: + + monitor_parameters: An object OM's configuration parameters. + + source: A string describing the data event source. + """ + data_sources: Dict[str, OmDataSourceProtocol] = { + "timestamp": TimestampPsana( + data_source_name="timestamp", monitor_parameters=monitor_parameters + ), + "event_id": EventIdPsana( + data_source_name="eventid", monitor_parameters=monitor_parameters + ), + "detector_data": Epix10kaPsana( + data_source_name="detector", monitor_parameters=monitor_parameters + ), + "beam_energy": BeamEnergyPsana( + data_source_name="beam_energy", monitor_parameters=monitor_parameters + ), + "detector_distance": EpicsVariablePsana( + data_source_name="detector_distance", monitor_parameters=monitor_parameters, ), + # "detector_distance": FloatEntryFromConfiguration( + # data_source_name="fallback_detector_distance_in_mm", + # monitor_parameters=monitor_parameters, + # ), "timetool_data": EpicsVariablePsana( data_source_name="timetool", monitor_parameters=monitor_parameters ), @@ -243,6 +343,104 @@ def get_data_event_handler(self) -> OmDataEventHandlerProtocol: """ return self._data_event_handler +class CxiLclsEpix10kaSinglePanelDataRetrieval(OmDataRetrievalProtocol): + """ + See documentation of the `__init__` function. + """ + + def __init__(self, *, monitor_parameters: MonitorParameters, source: str): + """ + Data Retrieval from psana at the CXI beamline of the LCLS facility (CSPAD). + + This class implements OM's Data Retrieval Layer for the CXI beamline of the + LCLS facility, using the CSPAD x-ray detector. This detector was used at the + beamline until early 2020. + + This class implements the interface described by its base Protocol class. + Please see the documentation of that class for additional information about + the interface. + + * This class considers an individual data event as equivalent to the content of + a psana event, which stores data related to a single detector frame. + + * A string combining psana's timestamp and fiducial information, with the + following format: + `{timestamp: seconds}-{timestamp: nanoseconds}-{fiducials}`, is used as event + identifier. + + * Psana provides timestamp, beam energy and detector distance data for each + event, retrieved from various sensors in the system. + + * The source string required by this Data Retrieval class is a string of the + type used by psana to identify specific runs, experiments, or live data + streams. + + Arguments: + + monitor_parameters: An object OM's configuration parameters. + + source: A string describing the data event source. + """ + data_sources: Dict[str, OmDataSourceProtocol] = { + "timestamp": TimestampPsana( + data_source_name="timestamp", monitor_parameters=monitor_parameters + ), + "event_id": EventIdPsana( + data_source_name="eventid", monitor_parameters=monitor_parameters + ), + "detector_data": Epix10kaSinglePanelPsana( + data_source_name="detector", monitor_parameters=monitor_parameters + ), + "beam_energy": BeamEnergyPsana( + data_source_name="beam_energy", monitor_parameters=monitor_parameters + ), + "detector_distance": EpicsVariablePsana( + data_source_name="detector_distance", + monitor_parameters=monitor_parameters, + ), + # "detector_distance": FloatEntryFromConfiguration( + # data_source_name="fallback_detector_distance_in_mm", + # monitor_parameters=monitor_parameters, + # ), + "timetool_data": EpicsVariablePsana( + data_source_name="timetool", monitor_parameters=monitor_parameters + ), + "optical_laser_active": EvrCodesPsana( + data_source_name="active_optical_laser", + monitor_parameters=monitor_parameters, + ), + "xrays_active": EvrCodesPsana( + data_source_name="active_xrays", + monitor_parameters=monitor_parameters, + ), + "post_sample_intensity": DiodeTotalIntensityPsana( + data_source_name="post_sample_intensity", + monitor_parameters=monitor_parameters, + ), + "lcls_extra": LclsExtraPsana( + data_source_name="lcls_extra", + monitor_parameters=monitor_parameters, + ), + } + + self._data_event_handler: OmDataEventHandlerProtocol = PsanaDataEventHandler( + source=source, + monitor_parameters=monitor_parameters, + data_sources=data_sources, + ) + + def get_data_event_handler(self) -> OmDataEventHandlerProtocol: + """ + Retrieves the Data Event Handler used by the Data Retrieval class. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Returns: + + The Data Event Handler used by the Data Retrieval class. + """ + return self._data_event_handler class LclsEpix100DataRetrieval(OmDataRetrievalProtocol): """ diff --git a/src/om/data_retrieval_layer/data_sources_psana.py b/src/om/data_retrieval_layer/data_sources_psana.py index b3a9e405..d1ae6a44 100644 --- a/src/om/data_retrieval_layer/data_sources_psana.py +++ b/src/om/data_retrieval_layer/data_sources_psana.py @@ -322,6 +322,97 @@ def get_data( return epixka2m_reshaped +class Epix10kaSinglePanelPsana(OmDataSourceProtocol): + """ + See documentation of the `__init__` function. + """ + + def __init__( + self, + *, + data_source_name: str, + monitor_parameters: MonitorParameters, + ): + """ + Epix10KA 2M detector data frames from psana at the LCLS facility. + + This class deals with the retrieval of Epix10Ka 2M detector data frames from + the psana software framework. + + This class implements the interface described by its base Protocol class. + Please see the documentation of that class for additional information about + the interface. + + Arguments: + + data_source_name: A name that identifies the current data source. It is + used, for example, in communications with the user or for the retrieval + of a sensor's initialization parameters. + + monitor_parameters: An object storing OM's configuration parameters. + """ + self._data_source_name = data_source_name + self._monitor_parameters = monitor_parameters + + def initialize_data_source(self) -> None: + """ + Initializes the Epix10KA 2M detector frame data source. + + Please see the documentation of the base Protocol class for additional + information about this method. + + This function initializes data retrieval for the detector whose psana name + matches the `psana_{source_protocols_name}_name` entry in OM's + `data_retrieval_layer` configuration parameter group, or for the detector with + a given psana name, if the `source_protocols_name` argument has the format + `psana-{psana detector name}`. + """ + self._data_retrieval_function: Callable[ + [Any], Any + ] = _get_psana_data_retrieval_function( + source_protocols_name=self._data_source_name, + monitor_parameters=self._monitor_parameters, + ) + + def get_data( + self, *, event: Dict[str, Any] + ) -> Union[NDArray[numpy.float_], NDArray[numpy.int_]]: + """ + Retrieves an Epix10KA 2M detector data frame from psana. + + This method overrides the corresponding method of the base class: please also + refer to the documentation of that class for more information. + + This function retrieves from psana the detector data frame associated with the + provided event. It returns the frame as a 2D array storing pixel information. + Data is retrieved in calibrated or non-calibrated form depending on the + value of the `{source_protocols_name}_calibration` entry in OM's + `data_retrieval_layer` configuration parameter group. + + Arguments: + + event: A dictionary storing the event data. + + Returns: + + A detector data frame. + + Raises: + + OmDataExtractionError: Raised when data cannot be retrieved from psana. + """ + epixka2m_psana: Union[ + NDArray[numpy.float_], NDArray[numpy.int_] + ] = self._data_retrieval_function(event["data"]) + if epixka2m_psana is None: + raise OmDataExtractionError("Could not retrieve detector data from psana.") + + # Rearranges the data into 'slab' format. + epixka2m_reshaped: Union[ + NDArray[numpy.float_], NDArray[numpy.int_] + ] = epixka2m_psana.reshape(352, 384) + + return epixka2m_reshaped class Jungfrau4MPsana(OmDataSourceProtocol): """ diff --git a/src/om/lib/cheetah.py b/src/om/lib/cheetah.py index ed3899da..916b3a16 100644 --- a/src/om/lib/cheetah.py +++ b/src/om/lib/cheetah.py @@ -30,6 +30,7 @@ from om.lib.exceptions import OmHdf5UnsupportedDataFormat from om.lib.parameters import get_parameter_from_parameter_group from om.lib.rich_console import console, get_current_timestamp +from om.lib.geometry import TypeDetector class TypeFrameListData(NamedTuple): @@ -1017,6 +1018,453 @@ def _resize_datasets(self, *, extension_size: int = 1) -> None: dataset.resize(self._num_frames + extension_size, axis=0) self._num_frames += extension_size +class SwaxsHDF5Writer: + """ + See documentation of the `__init__` function. + """ + + def __init__( # noqa: C901 + self, + *, + node_rank: int, + cheetah_parameters: Dict[str, Any], + num_radial_bins: int, + ) -> None: + """ + HDF5 file writer for Cheetah. + + This class creates HDF5 data files to store the information processed by + Cheetah. For each data event, this class saves into an HDF5 file a processed + detector data frame, the list of Bragg peaks detected in the frame, and some + additional information (timestamp, beam energy, detector distance, pump laser + state). + + Arguments: + + cheetah_parameters: A set of OM configuration parameters collected together + in a parameter group. The parameter group must contain the following + entries: + + directory_for_processed_data: A relative or absolute path to the + directory where the output files are written. + + compression: The compression filter to be applied to the data in the + output file. + + hdf5_fields: A dictionary storing information about the internal HDF5 + path where each data entry must be written. + + * The keys in the dictionary must store the names of data entries + to write. + + * The corresponding dictionary values must contain the internal + HDF5 paths where the entries must be written. + + processed_filename_prefix: A string that is prepended to the name of + the output files. Optional. If the value of this entry is None, the + string 'processed_' will be used as prefix. Defaults to None. + + processed_filename_extension: An extension string that id appended to + the name of the output files. Optional. If the value of this entry + is None, the string 'h5' is be used as extension. Defaults to + None. + + compression_opts: The compression level to be used, if data compression + is applied to the output files. The information in this entry only + applies if the corresponding `compression` entry is not None, + otherwise, it is ignored. Optional. If the value of this entry is + None, the compression level is set to 4. Defaults to None. + + compression_shuffle: Whether the `shuffle` filter is applied. If the + value of this entry is True, the filter is applied to the data + being written, otherwise it is not. Defaults to None. + + max_num_peaks: The maximum number of detected Bragg peaks that are + written in the HDF5 file for each event. Optional. If the value + of this entry is None, only the first 1024 peaks detected in each + frame are written to the output file. Defaults to None. + + node_rank: The rank of the OM node that writes the data in the output + files. + """ + # Output file + directory_for_processed_data: str = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="processed_directory", + parameter_type=str, + required=True, + ) + processed_filename_prefix: Union[ + str, None + ] = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="processed_filename_prefix", + parameter_type=str, + ) + if processed_filename_prefix is None: + processed_filename_prefix = "processed" + self._processed_filename: pathlib.Path = ( + pathlib.Path(directory_for_processed_data).resolve() + / f"{processed_filename_prefix}_{node_rank}.inprogress" + ) + processed_filename_extension: Union[ + str, None + ] = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="processed_filename_extension", + parameter_type=str, + ) + if processed_filename_extension is None: + processed_filename_extension = "h5" + self._processed_filename_extension: str = f".{processed_filename_extension}" + + # HDF5 fields + self._hdf5_fields = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="hdf5_fields", + parameter_type=dict, + ) + + # Data format + self._data_type: Union[ + str, DTypeLike, None + ] = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="hdf5_file_data_type", + parameter_type=str, + ) + + # Compression + compression: Union[str, None] = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="hdf5_file_compression", + parameter_type=str, + ) + if compression not in ("gzip", "bitshuffle_with_zstd"): + # TODO: print a warning or an error for unsupported compression type + # If a warning say no compression will be applied + compression = None + if compression == "gzip": + compression_level: int = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="hdf5_file_gzip_compression_level", + parameter_type=int, + default=4, + ) + self._compression_kwargs: Dict[str, Any] = { + "compression": "gzip", + "compression_opts": compression_level, + } + elif compression == "bitshuffle_with_zstd": + compression_level: int = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="hdf5_file_zstd_compression_level", + parameter_type=int, + default=3, + ) + self._compression_kwargs = dict( + hdf5plugin.Bitshuffle(cname="zstd", clevel=compression_level) + ) + else: + self._compression_kwargs = {} + self._compression_kwargs["shuffle"] = get_parameter_from_parameter_group( + group=cheetah_parameters, + parameter="hdf5_file_compression_shuffle", + parameter_type=bool, + default=False, + ) + + if self._data_type is None: + detector_data_type: numpy.ndarray = numpy.float32 + else: + detector_data_type = numpy.dtype(self._data_type) + + # TODO: decide what to do if file exists + self._h5file: Any = h5py.File(self._processed_filename, "w") + + self._resizable_datasets: Dict[str, Any] = {} + + # compression = self._compression_kwargs["compression"] + compression_opts = None #self._compression_kwargs["compression_opts"] + compression_shuffle = None #self._compression_kwargs["shuffle"] + + if "detector_data" in self._hdf5_fields.keys(): + self._resizable_datasets["detector_data"] = self._h5file.create_dataset( + name=self._hdf5_fields["detector_data"], + shape=(0,) + detector_data_shape, + maxshape=(None,) + detector_data_shape, + dtype=detector_data_type, + chunks=(1,) + detector_data_shape, + compression=compression, + compression_opts=compression_opts, + shuffle=compression_shuffle, + ) + if "event_id" in self._hdf5_fields.keys(): + self._resizable_datasets["event_id"] = self._h5file.create_dataset( + name=self._hdf5_fields["event_id"], + shape=(0,), + maxshape=(None,), + dtype=h5py.special_dtype(vlen=str), + ) + if "optical_laser_active" in self._hdf5_fields.keys(): + self._resizable_datasets[ + "optical_laser_active" + ] = self._h5file.create_dataset( + name=self._hdf5_fields["optical_laser_active"], + shape=(0,), + maxshape=(None,), + dtype=numpy.bool, + ) + + radial_shape = (num_radial_bins,) + if "q" in self._hdf5_fields.keys(): + self._resizable_datasets["q"] = self._h5file.create_dataset( + name=self._hdf5_fields["q"], + shape=(0,) + radial_shape, + maxshape=(None,) + radial_shape, + dtype=detector_data_type, + chunks=(1,) + radial_shape, + compression=compression, + compression_opts=compression_opts, + shuffle=compression_shuffle, + ) + if "radial" in self._hdf5_fields.keys(): + self._resizable_datasets["radial"] = self._h5file.create_dataset( + name=self._hdf5_fields["radial"], + shape=(0,) + radial_shape, + maxshape=(None,) + radial_shape, + dtype=detector_data_type, + chunks=(1,) + radial_shape, + compression=compression, + compression_opts=compression_opts, + shuffle=compression_shuffle, + ) + if "errors" in self._hdf5_fields.keys(): + self._resizable_datasets["errors"] = self._h5file.create_dataset( + name=self._hdf5_fields["errors"], + shape=(0,) + radial_shape, + maxshape=(None,) + radial_shape, + dtype=detector_data_type, + chunks=(1,) + radial_shape, + compression=compression, + compression_opts=compression_opts, + shuffle=compression_shuffle, + ) + if "image_sum" in self._hdf5_fields.keys(): + self._resizable_datasets[ + "image_sum" + ] = self._h5file.create_dataset( + name=self._hdf5_fields["image_sum"], + shape=(0,), + maxshape=(None,), + dtype=numpy.bool, + ) + if "frame_is_droplet" in self._hdf5_fields.keys(): + self._resizable_datasets[ + "frame_is_droplet" + ] = self._h5file.create_dataset( + name=self._hdf5_fields["frame_is_droplet"], + shape=(0,), + maxshape=(None,), + dtype=numpy.bool, + ) + if "frame_is_crystal" in self._hdf5_fields.keys(): + self._resizable_datasets[ + "frame_is_crystal" + ] = self._h5file.create_dataset( + name=self._hdf5_fields["frame_is_crystal"], + shape=(0,), + maxshape=(None,), + dtype=numpy.bool, + ) + # Creating all requested 1D float64 datasets: + key: str + for key in ("timestamp", "beam_energy", "pixel_size", "detector_distance", "post_sample_intensity"): + if key in self._hdf5_fields.keys(): + self._resizable_datasets[key] = self._h5file.create_dataset( + name=self._hdf5_fields[key], + shape=(0,), + maxshape=(None,), + dtype=numpy.float64, + ) + + self._extra_groups: Dict[str, Any] = {} + + self._requested_datasets: Set[str] = set(self._hdf5_fields.keys()) + + self._num_frames: int = 0 + + def _create_extra_datasets( + self, group_name: str, extra_data: Dict[str, Any] + ) -> None: + # Creates empty dataset in the extra data group for each item in extra_data dict + # using dict keys as dataset names. Supported data types: numpy.ndarray, str, + # float, int and bool + key: str + value: Any + for key, value in extra_data.items(): + if isinstance(value, numpy.ndarray): + self._resizable_datasets[group_name + "/" + key] = self._extra_groups[ + group_name + ].create_dataset( + name=key, + shape=(0, *value.shape), + maxshape=(None, *value.shape), + dtype=value.dtype, + ) + elif isinstance(value, str): + self._resizable_datasets[group_name + "/" + key] = self._extra_groups[ + group_name + ].create_dataset( + name=key, + shape=(0,), + maxshape=(None,), + dtype=h5py.special_dtype(vlen=str), + ) + elif ( + numpy.issubdtype(type(value), numpy.integer) + or numpy.issubdtype(type(value), numpy.floating) + or numpy.issubdtype(type(value), numpy.bool_) + ): + self._resizable_datasets[group_name + "/" + key] = self._extra_groups[ + group_name + ].create_dataset( + name=key, + shape=(0,), + maxshape=(None,), + dtype=type(value), + ) + else: + raise exceptions.OmHdf5UnsupportedDataFormat( + "Cannot write the '{}' data entry into the output HDF5: " + "its format is not supported.".format(key) + ) + + def _write_extra_data(self, group_name: str, extra_data: Dict[str, Any]) -> None: + key: str + value: Any + for key, value in extra_data.items(): + self._extra_groups[group_name][key][self._num_frames - 1] = extra_data[key] + + def write_frame(self, processed_data: Dict[str, Any]) -> None: + """ + Writes one data frame to the HDF5 file. + + Arguments: + + processed_data: A dictionary containing the data to write in the HDF5 file. + """ + # Datasets to write: + fields: Set[str] = set(processed_data.keys()) & self._requested_datasets + + extra_group_name: str + if self._num_frames == 0: + for extra_group_name in self._extra_groups: + if extra_group_name in fields: + self._create_extra_datasets( + extra_group_name, processed_data[extra_group_name] + ) + + self._resize_datasets() + frame_num: int = self._num_frames - 1 + dataset_dict_keys_to_write: List[ + Literal[ + "detector_data", + "event_id", + "timestamp", + "beam_energy", + "detector_distance", + "post_sample_intensity", + "optical_laser_active", + "q", + "radial", + "errors", + "image_sum", + "frame_is_droplet", + "frame_is_crystal" + ] + ] = [ + "detector_data", + "event_id", + "timestamp", + "beam_energy", + "detector_distance", + "post_sample_intensity", + "optical_laser_active", + "q", + "radial", + "errors", + "image_sum", + "frame_is_droplet", + "frame_is_crystal" + ] + dataset_dict_key: str + for dataset_dict_key in dataset_dict_keys_to_write: + if dataset_dict_key in fields: + self._resizable_datasets[dataset_dict_key][frame_num] = processed_data[ + dataset_dict_key + ] + + if "peak_list" in fields: + peak_list: cryst_algs.TypePeakList = processed_data["peak_list"] + n_peaks: int = min(peak_list["num_peaks"], self._max_num_peaks) + self._resizable_datasets["npeaks"][frame_num] = n_peaks + peak_dict_keys_to_write: List[ + Literal[ + "fs", "ss", "intensity", "num_pixels", "max_pixel_intensity", "snr" + ] + ] = ["fs", "ss", "intensity", "num_pixels", "max_pixel_intensity", "snr"] + peak_dict_key: str + for peak_dict_key in peak_dict_keys_to_write: + self._resizable_datasets[peak_dict_key][ + frame_num, :n_peaks + ] = peak_list[peak_dict_key][:n_peaks] + + for extra_group_name in self._extra_groups: + if extra_group_name in fields: + self._write_extra_data( + extra_group_name, + processed_data[extra_group_name], + ) + + def close(self) -> None: + """ + Closes the file being written. + """ + self._h5file.close() + print( + "{0} frames saved in {1} file.".format( + self._num_frames, self._processed_filename + ) + ) + sys.stdout.flush() + + def get_current_filename(self) -> pathlib.Path: + """ + Retrieves the path to the file being written. + + Returns: + + The path to the file currently being written. + """ + return self._processed_filename + + def get_num_written_frames(self) -> int: + """ + Retrieves the number of already written frames. + + Returns: + + The number of frames already written in the current file. + """ + return self._num_frames - 1 + + def _resize_datasets(self, extension_size: int = 1) -> None: + # Extends all resizable datasets by the specified extension size + dataset: Any + for dataset in self._resizable_datasets.values(): + dataset.resize(self._num_frames + extension_size, axis=0) + self._num_frames += extension_size class SumHDF5Writer: """ diff --git a/src/om/processing_layer/__init__.py b/src/om/processing_layer/__init__.py index dfe5e2dd..dec6a8ac 100644 --- a/src/om/processing_layer/__init__.py +++ b/src/om/processing_layer/__init__.py @@ -27,4 +27,5 @@ from .crystallography import CrystallographyProcessing # noqa: F401 from .testing import TestProcessing # noqa: F401 from .xes import XesProcessing # noqa: F401 -from .swaxs import SwaxsProcessing \ No newline at end of file +from .swaxs import SwaxsProcessing +from .swaxs import SwaxsCheetahProcessing diff --git a/src/om/processing_layer/swaxs.py b/src/om/processing_layer/swaxs.py index 5aecfa70..a1e32e88 100644 --- a/src/om/processing_layer/swaxs.py +++ b/src/om/processing_layer/swaxs.py @@ -28,7 +28,7 @@ from numpy.typing import NDArray from om.algorithms.generic import Binning, BinningPassthrough -from om.lib.cheetah import HDF5Writer +from om.lib.cheetah import SwaxsHDF5Writer from om.lib.event_management import EventCounter from om.lib.geometry import DataVisualizer, GeometryInformation, TypePixelMaps from om.lib.parameters import MonitorParameters, get_parameter_from_parameter_group @@ -257,13 +257,13 @@ def process_data( data=data["detector_data"], beam_energy=data["beam_energy"], detector_distance=data["detector_distance"], - downstream_intensity=1.0 #data["post_sample_intensity"], + downstream_intensity=data["post_sample_intensity"], ) processed_data["radial_profile"] = radial_profile processed_data["detector_data_sum"] = detector_data_sum processed_data["q"] = q - processed_data["downstream_intensity"] = 1.0 #data["post_sample_intensity"] + processed_data["downstream_intensity"] = data["post_sample_intensity"] processed_data["roi1_intensity"] = roi1_intensity processed_data["roi2_intensity"] = roi2_intensity processed_data["sample_detected"] = sample_detected @@ -551,6 +551,11 @@ def __init__(self, *, monitor_parameters: MonitorParameters) -> None: ) ) + self._radial_profile_analysis: RadialProfileAnalysis = RadialProfileAnalysis( + geometry_information=self._geometry_information, + radial_parameters=self._monitor_params.get_parameter_group(group="radial"), + ) + def initialize_processing_node( self, *, node_rank: int, node_pool_size: int ) -> None: @@ -580,10 +585,6 @@ def initialize_processing_node( parameter_type=float, required=True, ) - self._radial_profile_analysis: RadialProfileAnalysis = RadialProfileAnalysis( - geometry_information=self._geometry_information, - radial_parameters=self._monitor_params.get_parameter_group(group="radial"), - ) # Frame sending self._send_hit_frame: bool = False @@ -619,12 +620,19 @@ def initialize_collecting_node( radial_parameters=self._monitor_params.get_parameter_group(group="radial"), ) + radial_bin_labels = self._radial_profile_analysis._radial_profile.get_radial_bin_labels() + radial_bin_centers = self._radial_profile_analysis._radial_profile.calculate_profile( + data=radial_bin_labels + ) + num_radial_bins = len(radial_bin_centers) + # File Writing - self._writer = HDF5Writer( + self._writer = SwaxsHDF5Writer( node_rank=node_rank, cheetah_parameters=self._monitor_params.get_parameter_group( group="radial_cheetah" ), + num_radial_bins=num_radial_bins, ) # Event counting @@ -688,6 +696,7 @@ def process_data( detector_data_sum: float ( radial_profile, + errors, q, sample_detected, roi1_intensity, @@ -698,13 +707,13 @@ def process_data( data=data["detector_data"], beam_energy=data["beam_energy"], detector_distance=data["detector_distance"], - downstream_intensity=1.0 #data["post_sample_intensity"], + downstream_intensity=data["post_sample_intensity"], ) processed_data["radial_profile"] = radial_profile processed_data["detector_data_sum"] = detector_data_sum processed_data["q"] = q - processed_data["downstream_intensity"] = 1.0 #data["post_sample_intensity"] + processed_data["post_sample_intensity"] = data["post_sample_intensity"] processed_data["roi1_intensity"] = roi1_intensity processed_data["roi2_intensity"] = roi2_intensity processed_data["sample_detected"] = sample_detected @@ -714,6 +723,7 @@ def process_data( processed_data["event_id"] = data["event_id"] processed_data["rg"] = rg + return (processed_data, node_rank) def wait_for_data( @@ -791,16 +801,16 @@ def collect_data( roi2_intensity_history, hit_rate_history, rg_history, + cumulative_hits_radial, ) = self._plots.update_plots( radial_profile=received_data["radial_profile"], detector_data_sum=received_data["detector_data_sum"], q=received_data["q"], - downstream_intensity=received_data["downstream_intensity"], + downstream_intensity=received_data["post_sample_intensity"], roi1_intensity=received_data["roi1_intensity"], roi2_intensity=received_data["roi2_intensity"], sample_detected=received_data["sample_detected"], rg=received_data["rg"], - frame_sum=received_data["frame_sum"], ) # Event counting @@ -817,6 +827,7 @@ def collect_data( "timestamp": received_data["timestamp"], "sample_detected": received_data["sample_detected"], "detector_distance": received_data["detector_distance"], + "post_sample_intensity": received_data["post_sample_intensity"], "beam_energy": received_data["beam_energy"], "event_id": received_data["event_id"], }