From 2e81b6bd238c61c1757d18ac9f3d35c2a75da7a4 Mon Sep 17 00:00:00 2001 From: Davide Curcio Date: Wed, 11 Oct 2023 14:46:32 +0200 Subject: [PATCH 1/7] add k warping function --- sed/calibrator/hextof.py | 72 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/sed/calibrator/hextof.py b/sed/calibrator/hextof.py index 58e2aee2..31e70243 100644 --- a/sed/calibrator/hextof.py +++ b/sed/calibrator/hextof.py @@ -152,3 +152,75 @@ def convert_to_ns(x): "tof_binwidth": tof_binwidth } return df, metadata + + + + +def calibrate_k( + df: Union[pd.DataFrame, dask.dataframe.DataFrame], + warp_params: Sequence[float] = None, + x_column: str = "dldPosX", + y_column: str = "dldPosY", + kx_column: str = "kx", + ky_column: str = "ky", + config: dict = None, +) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: + """Converts the 8s time in steps to time in ns. + + Args: + warp_params (Sequence[float], optional): warping parameters. + this function returns the distorted coordinates given the undistorted ones + a little complicated by the fact that (warp_params[6],warp_params[7]) needs to go to (0,0) + it uses a radial distortion model called division model (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) + commonly used to correct for lens artifacts + warp_params[0],warp_params[1] center of distortion in px + warp_params[6],warp_params[7] normal emission (Gamma) in px + warp_params[2],warp_params[3],warp_params[4] K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) + warp_params[5] rotation in rad + Defaults to config["dataframe"]["warp_params"]. + x_column (str, optional): Name of the column containing the + x steps. Defaults to config["dataframe"]["x_column"]. + y_column (str, optional): Name of the column containing the + y steps. Defaults to config["dataframe"]["y_column"]. + kx_column (str, optional): Name of the column containing the + x steps. Defaults to config["dataframe"]["kx_column"]. + ky_column (str, optional): Name of the column containing the + y steps. Defaults to config["dataframe"]["ky_column"]. + """ + if warp_params is None: + if config is None: + raise ValueError("Either warp_params or config must be given.") + warp_params: float = config["dataframe"]["warp_params"] + if x_column is None: + if config is None: + raise ValueError("Either x_column or config must be given.") + x_column: str = config["dataframe"]["x_column"] + if kx_column is None: + if config is None: + raise ValueError("Either kx_column or config must be given.") + kx_column: str = config["dataframe"]["kx_column"] + if y_column is None: + if config is None: + raise ValueError("Either y_column or config must be given.") + y_column: str = config["dataframe"]["y_column"] + if ky_column is None: + if config is None: + raise ValueError("Either ky_column or config must be given.") + ky_column: str = config["dataframe"]["ky_column"] + + def convert_to_kx(x): + return np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),2)+warp_params[4]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),4)) * np.cos(np.arctan2(x[y_column]-warp_params[1],x[x_column]-warp_params[0])-warp_params[5])-np.sqrt(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))+warp_params[4]*np.power((np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2)),2))*np.cos(np.arctan2(warp_params[7]-warp_params[1],warp_params[6]-warp_params[0])-warp_params[5]) + def convert_to_ky(x): + return np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),2)+warp_params[4]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),4)) * np.sin(np.arctan2(x[y_column]-warp_params[1],x[x_column]-warp_params[0])-warp_params[5])-np.sqrt(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))+warp_params[4]*np.power((np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2)),2))*np.sin(np.arctan2(warp_params[7]-warp_params[1],warp_params[6]-warp_params[0])-warp_params[5]) + df[kx_column] = df.map_partitions( + convert_to_kx, meta=(kx_column, np.float64) + ) + df[ky_column] = df.map_partitions( + convert_to_ky, meta=(ky_column, np.float64) + ) + + metadata = {} + metadata["applied"] = True + metadata["warp_params"] = warp_params + + return df, metadata \ No newline at end of file From f89f2d308c96314a3b47354ee0e575ac97f446d2 Mon Sep 17 00:00:00 2001 From: Davide Curcio Date: Wed, 11 Oct 2023 14:57:08 +0200 Subject: [PATCH 2/7] better comments --- sed/calibrator/hextof.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sed/calibrator/hextof.py b/sed/calibrator/hextof.py index 31e70243..9d9d5f11 100644 --- a/sed/calibrator/hextof.py +++ b/sed/calibrator/hextof.py @@ -165,14 +165,13 @@ def calibrate_k( ky_column: str = "ky", config: dict = None, ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: - """Converts the 8s time in steps to time in ns. + """ This function returns the distorted coordinates given the undistorted ones + a little complicated by the fact that (warp_params[6],warp_params[7]) needs to go to (0,0) + it uses a radial distortion model called division model (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) + commonly used to correct for lens artifacts. Args: warp_params (Sequence[float], optional): warping parameters. - this function returns the distorted coordinates given the undistorted ones - a little complicated by the fact that (warp_params[6],warp_params[7]) needs to go to (0,0) - it uses a radial distortion model called division model (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) - commonly used to correct for lens artifacts warp_params[0],warp_params[1] center of distortion in px warp_params[6],warp_params[7] normal emission (Gamma) in px warp_params[2],warp_params[3],warp_params[4] K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) From 6c8a76222fd0fddd6a95b489154990d0d3a5a8ad Mon Sep 17 00:00:00 2001 From: Davide Curcio Date: Wed, 11 Oct 2023 15:02:04 +0200 Subject: [PATCH 3/7] change new function to more descriptive name --- sed/calibrator/hextof.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sed/calibrator/hextof.py b/sed/calibrator/hextof.py index 9d9d5f11..bd93beef 100644 --- a/sed/calibrator/hextof.py +++ b/sed/calibrator/hextof.py @@ -156,7 +156,7 @@ def convert_to_ns(x): -def calibrate_k( +def calibrate_k_division_model( df: Union[pd.DataFrame, dask.dataframe.DataFrame], warp_params: Sequence[float] = None, x_column: str = "dldPosX", From 943088d28ddecf2338d86c218fcb97a3b79cc0e4 Mon Sep 17 00:00:00 2001 From: Steinn Ymir Agustsson Date: Wed, 11 Oct 2023 21:30:37 +0200 Subject: [PATCH 4/7] refactor readability --- sed/calibrator/hextof.py | 52 ++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/sed/calibrator/hextof.py b/sed/calibrator/hextof.py index bd93beef..c0297664 100644 --- a/sed/calibrator/hextof.py +++ b/sed/calibrator/hextof.py @@ -154,8 +154,6 @@ def convert_to_ns(x): return df, metadata - - def calibrate_k_division_model( df: Union[pd.DataFrame, dask.dataframe.DataFrame], warp_params: Sequence[float] = None, @@ -165,16 +163,20 @@ def calibrate_k_division_model( ky_column: str = "ky", config: dict = None, ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: - """ This function returns the distorted coordinates given the undistorted ones - a little complicated by the fact that (warp_params[6],warp_params[7]) needs to go to (0,0) - it uses a radial distortion model called division model (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) - commonly used to correct for lens artifacts. + """ K calibration based on the division model + + This function returns the distorted coordinates given the undistorted ones + a little complicated by the fact that (warp_params[6],warp_params[7]) needs to + go to (0,0) + it uses a radial distortion model called division model + (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) + commonly used to correct for lens artifacts. Args: warp_params (Sequence[float], optional): warping parameters. warp_params[0],warp_params[1] center of distortion in px warp_params[6],warp_params[7] normal emission (Gamma) in px - warp_params[2],warp_params[3],warp_params[4] K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) + warp_params[2],warp_params[3], warp_params[4] K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) warp_params[5] rotation in rad Defaults to config["dataframe"]["warp_params"]. x_column (str, optional): Name of the column containing the @@ -207,10 +209,30 @@ def calibrate_k_division_model( raise ValueError("Either ky_column or config must be given.") ky_column: str = config["dataframe"]["ky_column"] - def convert_to_kx(x): - return np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),2)+warp_params[4]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),4)) * np.cos(np.arctan2(x[y_column]-warp_params[1],x[x_column]-warp_params[0])-warp_params[5])-np.sqrt(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))+warp_params[4]*np.power((np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2)),2))*np.cos(np.arctan2(warp_params[7]-warp_params[1],warp_params[6]-warp_params[0])-warp_params[5]) + wp = warp_params + def convert_to_kx(x): + """Converts the x steps to kx.""" + x_diff = x[x_column] - wp[0] + y_diff = x[y_column] - wp[1] + dist = np.sqrt(x_diff**2 + y_diff**2) + den = wp[2] + wp[3]*dist**2 + wp[4]*dist**4 + angle = np.arctan2(y_diff, x_diff) - wp[5] + warp_diff = np.sqrt((wp[6] - wp[0])**2 + (wp[7] - wp[1])**2) + warp_den = wp[2] + wp[3]*(wp[6] - wp[0])**2 + wp[4]*(wp[7] - wp[1])**2 + warp_angle = np.arctan2(wp[7] - wp[1], wp[6] - wp[0]) - wp[5] + return (dist/den)*np.cos(angle) - (warp_diff/warp_den)*np.cos(warp_angle) + def convert_to_ky(x): - return np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),2)+warp_params[4]*np.power(np.sqrt(np.power((x[x_column]-warp_params[0]),2)+np.power((x[y_column]-warp_params[1]),2)),4)) * np.sin(np.arctan2(x[y_column]-warp_params[1],x[x_column]-warp_params[0])-warp_params[5])-np.sqrt(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))/(warp_params[2]+warp_params[3]*(np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2))+warp_params[4]*np.power((np.power((warp_params[6]-warp_params[0]),2)+np.power((warp_params[7]-warp_params[1]),2)),2))*np.sin(np.arctan2(warp_params[7]-warp_params[1],warp_params[6]-warp_params[0])-warp_params[5]) + x_diff = x[x_column] - wp[0] + y_diff = x[y_column] - wp[1] + dist = np.sqrt(x_diff**2 + y_diff**2) + den = wp[2] + wp[3]*dist**2 + wp[4]*dist**4 + angle = np.arctan2(y_diff, x_diff) - wp[5] + warp_diff = np.sqrt((wp[6] - wp[0])**2 + (wp[7] - wp[1])**2) + warp_den = wp[2] + wp[3]*(wp[6] - wp[0])**2 + wp[4]*(wp[7] - wp[1])**2 + warp_angle = np.arctan2(wp[7] - wp[1], wp[6] - wp[0]) - wp[5] + return (dist/den)*np.sin(angle) - (warp_diff/warp_den)*np.sin(warp_angle) + df[kx_column] = df.map_partitions( convert_to_kx, meta=(kx_column, np.float64) ) @@ -218,8 +240,8 @@ def convert_to_ky(x): convert_to_ky, meta=(ky_column, np.float64) ) - metadata = {} - metadata["applied"] = True - metadata["warp_params"] = warp_params - - return df, metadata \ No newline at end of file + metadata = { + "applied": True, + "warp_params": warp_params, + } + return df, metadata From 2eaec062b1f880a30d9a066045b80e88bbf13015 Mon Sep 17 00:00:00 2001 From: Steinn Ymir Agustsson Date: Wed, 11 Oct 2023 21:36:34 +0200 Subject: [PATCH 5/7] add wrapper func in processor --- sed/calibrator/hextof.py | 8 +++---- sed/core/processor.py | 48 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/sed/calibrator/hextof.py b/sed/calibrator/hextof.py index c0297664..adf73435 100644 --- a/sed/calibrator/hextof.py +++ b/sed/calibrator/hextof.py @@ -157,10 +157,10 @@ def convert_to_ns(x): def calibrate_k_division_model( df: Union[pd.DataFrame, dask.dataframe.DataFrame], warp_params: Sequence[float] = None, - x_column: str = "dldPosX", - y_column: str = "dldPosY", - kx_column: str = "kx", - ky_column: str = "ky", + x_column: str = None, + y_column: str = None, + kx_column: str = None, + ky_column: str = None, config: dict = None, ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: """ K calibration based on the division model diff --git a/sed/core/processor.py b/sed/core/processor.py index 15ed267c..7c80f2d1 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -1271,6 +1271,54 @@ def align_dld_sectors( duplicate_policy="merge", ) + def calibrate_k_division_model( + self, + warp_params: Sequence[float] = None, + x_column: str = None, + y_column: str = None, + kx_column: str = None, + ky_column: str = None, + ) -> None: + """Calibrate k space using the division model. + + This function returns the distorted coordinates given the undistorted ones + a little complicated by the fact that (warp_params[6],warp_params[7]) needs to + go to (0,0) + it uses a radial distortion model called division model + (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) + commonly used to correct for lens artifacts. + + Args: + warp_params (Sequence[float], optional): warping parameters. + warp_params[0],warp_params[1] center of distortion in px + warp_params[6],warp_params[7] normal emission (Gamma) in px + warp_params[2],warp_params[3], warp_params[4] K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) + warp_params[5] rotation in rad + Defaults to config["dataframe"]["warp_params"]. + x_column (str, optional): Name of the column containing the + x steps. Defaults to config["dataframe"]["x_column"]. + y_column (str, optional): Name of the column containing the + y steps. Defaults to config["dataframe"]["y_column"]. + kx_column (str, optional): Name of the column containing the + x steps. Defaults to config["dataframe"]["kx_column"]. + ky_column (str, optional): Name of the column containing the + y steps. Defaults to config["dataframe"]["ky_column"]. + """ + self._dataframe, metadata = hextof.calibrate_k_division_model( + df=self._dataframe, + warp_params=warp_params, + x_column=x_column, + y_column=y_column, + kx_column=kx_column, + ky_column=ky_column, + config=self._config, + ) + self._attributes.add( + metadata, + "k_division_model", + duplicate_policy="raise", + ) + def pre_binning( self, df_partitions: int = 100, From 757a8e182efff38af8464e9deca910c43db840de Mon Sep 17 00:00:00 2001 From: Steinn Ymir Agustsson Date: Thu, 16 Nov 2023 23:32:36 +0100 Subject: [PATCH 6/7] incorporate division model in momenutm calibrator --- sed/calibrator/hextof.py | 247 ------------------------------------- sed/calibrator/momentum.py | 199 +++++++++++++++++++++++++++++- sed/core/processor.py | 156 +++++++++-------------- 3 files changed, 258 insertions(+), 344 deletions(-) delete mode 100644 sed/calibrator/hextof.py diff --git a/sed/calibrator/hextof.py b/sed/calibrator/hextof.py deleted file mode 100644 index adf73435..00000000 --- a/sed/calibrator/hextof.py +++ /dev/null @@ -1,247 +0,0 @@ -"""sed.calibrator.hextof module. Code for handling hextof specific transformations and -calibrations. -""" -from typing import Any -from typing import Dict -from typing import Sequence -from typing import Tuple -from typing import Union - -import numpy as np -import pandas as pd -import dask.dataframe - - -def unravel_8s_detector_time_channel( - df: dask.dataframe.DataFrame, - tof_column: str = None, - sector_id_column: str = None, - config: dict = None, -) -> dask.dataframe.DataFrame: - """Converts the 8s time in steps to time in steps and sectorID. - - The 8s detector encodes the dldSectorID in the 3 least significant bits of the - dldTimeSteps channel. - - Args: - tof_column (str, optional): Name of the column containing the - time-of-flight steps. Defaults to config["dataframe"]["tof_column"]. - sector_id_column (str, optional): Name of the column containing the - sectorID. Defaults to config["dataframe"]["sector_id_column"]. - config (dict, optional): Configuration dictionary. Defaults to None. - - Returns: - dask.dataframe.DataFrame: Dataframe with the new columns. - """ - if tof_column is None: - if config is None: - raise ValueError("Either tof_column or config must be given.") - tof_column = config["dataframe"]["tof_column"] - if sector_id_column is None: - if config is None: - raise ValueError("Either sector_id_column or config must be given.") - sector_id_column = config["dataframe"]["sector_id_column"] - - if sector_id_column in df.columns: - raise ValueError(f"Column {sector_id_column} already in dataframe. " - "This function is not idempotent.") - df[sector_id_column] = (df[tof_column] % 8).astype(np.int8) - df[tof_column] = (df[tof_column] // 8).astype(np.int32) - return df - - -def align_dld_sectors( - df: dask.dataframe.DataFrame, - sector_delays: Sequence[float] = None, - sector_id_column: str = None, - tof_column: str = None, - config: dict = None, -) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: - """Aligns the 8s sectors to the first sector. - - Args: - sector_delays (Sequece[float], optional): Sector delays for the 8s time. - in units of step. Calibration should be done with binning along dldTimeSteps. - Defaults to config["dataframe"]["sector_delays"]. - sector_id_column (str, optional): Name of the column containing the - sectorID. Defaults to config["dataframe"]["sector_id_column"]. - tof_column (str, optional): Name of the column containing the - time-of-flight. Defaults to config["dataframe"]["tof_column"]. - config (dict, optional): Configuration dictionary. Defaults to None. - - Returns: - dask.dataframe.DataFrame: Dataframe with the new columns. - dict: Metadata dictionary. - """ - if sector_delays is None: - if config is None: - raise ValueError("Either sector_delays or config must be given.") - sector_delays = config["dataframe"]["sector_delays"] - if sector_id_column is None: - if config is None: - raise ValueError("Either sector_id_column or config must be given.") - sector_id_column = config["dataframe"]["sector_id_column"] - if tof_column is None: - if config is None: - raise ValueError("Either tof_column or config must be given.") - tof_column = config["dataframe"]["tof_column"] - # align the 8s sectors - sector_delays_arr = dask.array.from_array(sector_delays) - - def align_sector(x): - val = x[tof_column] - sector_delays_arr[x[sector_id_column].values.astype(int)] - return val.astype(np.float32) - df[tof_column] = df.map_partitions( - align_sector, meta=(tof_column, np.float32) - ) - metadata: Dict[str,Any] = { - "applied": True, - "sector_delays": sector_delays, - } - return df, metadata - - -def dld_time_to_ns( - df: Union[pd.DataFrame, dask.dataframe.DataFrame], - tof_ns_column: str = None, - tof_binwidth: float = None, - tof_column: str = None, - tof_binning: int = None, - config: dict = None, -) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: - """Converts the 8s time in steps to time in ns. - - Args: - tof_binwidth (float, optional): Time step size in nanoseconds. - Defaults to config["dataframe"]["tof_binwidth"]. - tof_column (str, optional): Name of the column containing the - time-of-flight steps. Defaults to config["dataframe"]["tof_column"]. - tof_column (str, optional): Name of the column containing the - time-of-flight. Defaults to config["dataframe"]["tof_column"]. - tof_binning (int, optional): Binning of the time-of-flight steps. - - Returns: - dask.dataframe.DataFrame: Dataframe with the new columns. - dict: Metadata dictionary. - """ - if tof_binwidth is None: - if config is None: - raise ValueError("Either tof_binwidth or config must be given.") - tof_binwidth = config["dataframe"]["tof_binwidth"] - if tof_column is None: - if config is None: - raise ValueError("Either tof_column or config must be given.") - tof_column = config["dataframe"]["tof_column"] - if tof_binning is None: - if config is None: - raise ValueError("Either tof_binning or config must be given.") - tof_binning = config["dataframe"]["tof_binning"] - if tof_ns_column is None: - if config is None: - raise ValueError("Either tof_ns_column or config must be given.") - tof_ns_column = config["dataframe"]["tof_ns_column"] - - def convert_to_ns(x): - val = x[tof_column] * tof_binwidth * 2**tof_binning - return val.astype(np.float32) - df[tof_ns_column] = df.map_partitions( - convert_to_ns, meta=(tof_column, np.float32) - ) - metadata: Dict[str,Any] = { - "applied": True, - "tof_binwidth": tof_binwidth - } - return df, metadata - - -def calibrate_k_division_model( - df: Union[pd.DataFrame, dask.dataframe.DataFrame], - warp_params: Sequence[float] = None, - x_column: str = None, - y_column: str = None, - kx_column: str = None, - ky_column: str = None, - config: dict = None, -) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: - """ K calibration based on the division model - - This function returns the distorted coordinates given the undistorted ones - a little complicated by the fact that (warp_params[6],warp_params[7]) needs to - go to (0,0) - it uses a radial distortion model called division model - (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) - commonly used to correct for lens artifacts. - - Args: - warp_params (Sequence[float], optional): warping parameters. - warp_params[0],warp_params[1] center of distortion in px - warp_params[6],warp_params[7] normal emission (Gamma) in px - warp_params[2],warp_params[3], warp_params[4] K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) - warp_params[5] rotation in rad - Defaults to config["dataframe"]["warp_params"]. - x_column (str, optional): Name of the column containing the - x steps. Defaults to config["dataframe"]["x_column"]. - y_column (str, optional): Name of the column containing the - y steps. Defaults to config["dataframe"]["y_column"]. - kx_column (str, optional): Name of the column containing the - x steps. Defaults to config["dataframe"]["kx_column"]. - ky_column (str, optional): Name of the column containing the - y steps. Defaults to config["dataframe"]["ky_column"]. - """ - if warp_params is None: - if config is None: - raise ValueError("Either warp_params or config must be given.") - warp_params: float = config["dataframe"]["warp_params"] - if x_column is None: - if config is None: - raise ValueError("Either x_column or config must be given.") - x_column: str = config["dataframe"]["x_column"] - if kx_column is None: - if config is None: - raise ValueError("Either kx_column or config must be given.") - kx_column: str = config["dataframe"]["kx_column"] - if y_column is None: - if config is None: - raise ValueError("Either y_column or config must be given.") - y_column: str = config["dataframe"]["y_column"] - if ky_column is None: - if config is None: - raise ValueError("Either ky_column or config must be given.") - ky_column: str = config["dataframe"]["ky_column"] - - wp = warp_params - def convert_to_kx(x): - """Converts the x steps to kx.""" - x_diff = x[x_column] - wp[0] - y_diff = x[y_column] - wp[1] - dist = np.sqrt(x_diff**2 + y_diff**2) - den = wp[2] + wp[3]*dist**2 + wp[4]*dist**4 - angle = np.arctan2(y_diff, x_diff) - wp[5] - warp_diff = np.sqrt((wp[6] - wp[0])**2 + (wp[7] - wp[1])**2) - warp_den = wp[2] + wp[3]*(wp[6] - wp[0])**2 + wp[4]*(wp[7] - wp[1])**2 - warp_angle = np.arctan2(wp[7] - wp[1], wp[6] - wp[0]) - wp[5] - return (dist/den)*np.cos(angle) - (warp_diff/warp_den)*np.cos(warp_angle) - - def convert_to_ky(x): - x_diff = x[x_column] - wp[0] - y_diff = x[y_column] - wp[1] - dist = np.sqrt(x_diff**2 + y_diff**2) - den = wp[2] + wp[3]*dist**2 + wp[4]*dist**4 - angle = np.arctan2(y_diff, x_diff) - wp[5] - warp_diff = np.sqrt((wp[6] - wp[0])**2 + (wp[7] - wp[1])**2) - warp_den = wp[2] + wp[3]*(wp[6] - wp[0])**2 + wp[4]*(wp[7] - wp[1])**2 - warp_angle = np.arctan2(wp[7] - wp[1], wp[6] - wp[0]) - wp[5] - return (dist/den)*np.sin(angle) - (warp_diff/warp_den)*np.sin(warp_angle) - - df[kx_column] = df.map_partitions( - convert_to_kx, meta=(kx_column, np.float64) - ) - df[ky_column] = df.map_partitions( - convert_to_ky, meta=(ky_column, np.float64) - ) - - metadata = { - "applied": True, - "warp_params": warp_params, - } - return df, metadata diff --git a/sed/calibrator/momentum.py b/sed/calibrator/momentum.py index 7fb585de..1070a62f 100644 --- a/sed/calibrator/momentum.py +++ b/sed/calibrator/momentum.py @@ -6,6 +6,7 @@ from typing import Any from typing import Dict from typing import List +from typing import Sequence from typing import Tuple from typing import Union @@ -107,7 +108,10 @@ def __init__( self.correction: Dict[Any, Any] = {"applied": False} self.adjust_params: Dict[Any, Any] = {"applied": False} self.calibration: Dict[Any, Any] = {} - + self.division_model_params: Dict[str, Any] = self._config["momentum"].get( + "division_model_params", + {}, + ) self.x_column = self._config["dataframe"]["x_column"] self.y_column = self._config["dataframe"]["y_column"] self.corrected_x_column = self._config["dataframe"]["corrected_x_column"] @@ -1837,6 +1841,122 @@ def gather_calibration_metadata(self, calibration: dict = None) -> dict: return metadata + def calibrate_k_division_model( + self, + df: Union[pd.DataFrame, dask.dataframe.DataFrame], + warp_params: Union[dict[str, Any], Sequence[float]] = None, + x_column: str = None, + y_column: str = None, + kx_column: str = None, + ky_column: str = None, + ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: + """Use the division model to calibrate the momentum axis. + + This function returns the distorted coordinates given the undistorted ones + a little complicated by the fact that gamma needs to go to (0,0). + it uses a radial distortion model called division model + (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) + commonly used to correct for lens artifacts. + + The radial distortion parameters k0, k1, k2 are defined as follows: + .. math:: + K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) + where rpx is the distance from the center of distortion in pixels and rk is the + distance from the center of distortion in k space. + + Args: + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to apply the + distotion correction to. + warp_params (Sequence[float], optional): Parameters of the division model. + Either a dictionary containing the parameters or a sequence of the + parameters in the order ['center','k0','k1','k2','gamma']. + center and gamma are both 2D vectors, k0, k1 and k2 are scalars. + Defaults to config["momentum"]["division_model_params"]. + x_column (str, optional): Label of the source 'X' column. + Defaults to config["momentum"]["x_column"]. + y_column (str, optional): Label of the source 'Y' column. + Defaults to config["momentum"]["y_column"]. + kx_column (str, optional): Label of the destination 'X' column after + momentum calibration. Defaults to config["momentum"]["kx_column"]. + ky_column (str, optional): Label of the destination 'Y' column after + momentum calibration. Defaults to config["momentum"]["ky_column"]. + + Returns: + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe with added columns + metadata (dict): momentum calibration metadata dictionary. + """ + if x_column is None: + x_column = self.x_column + if y_column is None: + y_column = self.y_column + if kx_column is None: + kx_column = self.kx_column + if ky_column is None: + ky_column = self.ky_column + + if warp_params is None: + warp_params = self.division_model_params + + if isinstance(warp_params, Sequence): + if len(warp_params) != 7: + raise ValueError( + f"Warp parameters must be a sequence of 7 floats! (center, k0, k1, k2, gamma)\n" + f"Got {len(warp_params)} instead", + ) + warp_params = { + "center": np.asarray(warp_params[0:2]), + "k0": warp_params[2], + "k1": warp_params[3], + "k2": warp_params[4], + "gamma": np.asarray(warp_params[5:7]), + } + elif isinstance(warp_params, dict): + if not all(key in warp_params for key in ["center", "k0", "k1", "k2", "gamma"]): + raise ValueError( + f"Warp parameters must be a dictionary containing the keys " + "'center', 'k0', 'k1', 'k2', 'gamma'!\n" + f"Got {warp_params.keys()} instead", + ) + if len(warp_params["center"]) != 2: + raise ValueError( + f"Warp parameter 'center' must be a 2D vector!\n" + f"Got {warp_params['center']} instead", + ) + if len(warp_params["gamma"]) != 2: + raise ValueError( + f"Warp parameter 'gamma' must be a 2D vector!\n" + f"Got {warp_params['gamma']} instead", + ) + if not all( + isinstance(value, (int, float, np.integer, np.floating)) + for value in [warp_params[k] for k in ["k0", "k1", "k2"]] + ): + raise ValueError( + f"Warp parameters 'k0', 'k1' and 'k2' must be floats!\n" + f"Got {warp_params['k0']}, {warp_params['k1']} and {warp_params['k2']} instead", + ) + else: + raise TypeError("Warp parameters must be a dictionary or a sequence of floats!") + + df = calibrate_k_division_model( + df, + x_column=x_column, + y_column=y_column, + kx_column=kx_column, + ky_column=ky_column, + **warp_params, + ) + + metadata = { + "applied": True, + "warp_params": warp_params, + "x_column": x_column, + "y_column": y_column, + "kx_column": kx_column, + "ky_column": ky_column, + } + return df, metadata + def cm2palette(cmap_name: str) -> list: """Convert certain matplotlib colormap (cm) to bokeh palette. @@ -2091,3 +2211,80 @@ def load_dfield(file: str) -> Tuple[np.ndarray, np.ndarray]: pass return rdeform_field, cdeform_field + + +def calibrate_k_division_model( + df: Union[pd.DataFrame, dask.dataframe.DataFrame], + center: Tuple[float, float] = None, + k0: float = None, + k1: float = None, + k2: float = None, + rot: float = None, + gamma: Tuple[float, float] = None, + x_column: str = None, + y_column: str = None, + kx_column: str = None, + ky_column: str = None, +) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: + """K calibration based on the division model + + This function returns the distorted coordinates given the undistorted ones + a little complicated by the fact that gamma needs to go to (0,0). + it uses a radial distortion model called division model + (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) + commonly used to correct for lens artifacts. + + The radial distortion parameters k0, k1, k2 are defined as follows: + .. math:: + K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) + where rpx is the distance from the center of distortion in pixels and rk is the + distance from the center of distortion in k space. + + Args: + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to apply the + distotion correction to. + center (Tuple[float, float]): center of distortion in px + k0 (float): radial distortion parameter + k1 (float): radial distortion parameter + k2 (float): radial distortion parameter + rot (float): rotation in rad + gamma (Tuple[float, float]): normal emission (Gamma) in px + x_column (str): Name of the column containing the x steps. + y_column (str): Name of the column containing the y steps. + kx_column (str, optional): Name of the target calibrated x column. + If None, defaults to x_column. + ky_column (str, optional): Name of the target calibrated x column. + If None, defaults to y_column. + """ + if kx_column is None: + kx_column = x_column + if ky_column is None: + ky_column = y_column + + def convert_to_kx(x): + """Converts the x steps to kx.""" + x_diff = x[x_column] - center[0] + y_diff = x[y_column] - center[1] + dist = np.sqrt(x_diff**2 + y_diff**2) + den = k0 + k1 * dist**2 + k2 * dist**4 + angle = np.arctan2(y_diff, x_diff) - rot + warp_diff = np.sqrt((gamma[0] - center[0]) ** 2 + (gamma[1] - center[1]) ** 2) + warp_den = k0 + k1 * (gamma[0] - center[0]) ** 2 + k2 * (gamma[1] - center[1]) ** 2 + warp_angle = np.arctan2(gamma[1] - center[1], gamma[0] - center[0]) - rot + return (dist / den) * np.cos(angle) - (warp_diff / warp_den) * np.cos(warp_angle) + + def convert_to_ky(x): + x_diff = x[x_column] - center[0] + y_diff = x[y_column] - center[1] + dist = np.sqrt(x_diff**2 + y_diff**2) + den = k0 + k1 * dist**2 + k2 * dist**4 + angle = np.arctan2(y_diff, x_diff) - rot + warp_diff = np.sqrt((gamma[0] - center[0]) ** 2 + (gamma[1] - center[1]) ** 2) + warp_den = k0 + k1 * (gamma[0] - center[0]) ** 2 + k2 * (gamma[1] - center[1]) ** 2 + warp_angle = np.arctan2(gamma[1] - center[1], gamma[0] - center[0]) - rot + return (dist / den) * np.sin(angle) - (warp_diff / warp_den) * np.sin(warp_angle) + + df[kx_column] = df.map_partitions(convert_to_kx, meta=(kx_column, np.float64)) + df[ky_column] = df.map_partitions(convert_to_ky, meta=(ky_column, np.float64)) + + return df diff --git a/sed/core/processor.py b/sed/core/processor.py index 7c80f2d1..c472f194 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -21,7 +21,6 @@ from sed.calibrator import DelayCalibrator from sed.calibrator import EnergyCalibrator from sed.calibrator import MomentumCorrector -from sed.calibrator import hextof from sed.core.config import parse_config from sed.core.config import save_config from sed.core.dfops import apply_jitter @@ -1204,114 +1203,50 @@ def add_jitter(self, cols: Sequence[str] = None): metadata.append(col) self._attributes.add(metadata, "jittering", duplicate_policy="append") - def dld_time_to_ns( - self, - tof_ns_column: str = None, - tof_binwidth: float = None, - tof_column: str = None, - tof_binning: int = None, - ): - """Convert time-of-flight channel steps to nanoseconds. - - Args: - tof_binwidth (float, optional): Time step size in nanoseconds. - Defaults to config["dataframe"]["tof_binwidth"]. - tof_column (str, optional): Name of the column containing the - time-of-flight steps. Defaults to config["dataframe"]["tof_column"]. - tof_column (str, optional): Name of the column containing the - time-of-flight. Defaults to config["dataframe"]["tof_column"]. - tof_binning (int, optional): Binning of the time-of-flight steps. - - """ - if self._dataframe is not None: - print("Adding energy column to dataframe:") - # TODO assert order of execution through metadata - - self._dataframe, metadata = hextof.dld_time_to_ns( - df=self._dataframe, - tof_ns_column=tof_ns_column, - tof_binwidth=tof_binwidth, - tof_column=tof_column, - tof_binning=tof_binning, - config=self._config, - ) - self._attributes.add( - metadata, - "energy_calibration", - duplicate_policy="merge", - ) - - def align_dld_sectors( - self, - sector_delays: Sequence[float] = None, - sector_id_column: str = None, - tof_column: str = None, - ): - """ Align the 8s sectors of the HEXTOF endstation. - - Intended for use with HEXTOF endstation - - Args: - sector_delays (Sequence[float], optional): Delays of the 8s sectors in - picoseconds. Defaults to config["dataframe"]["sector_delays"]. - """ - if self._dataframe is not None: - print("Aligning 8s sectors of dataframe") - # TODO assert order of execution through metadata - self._dataframe, metadata = hextof.align_dld_sectors( - df=self._dataframe, - sector_delays=sector_delays, - sector_id_column=sector_id_column, - tof_column=tof_column, - config=self._config, - ) - self._attributes.add( - metadata, - "sector_alignment", - duplicate_policy="merge", - ) - def calibrate_k_division_model( self, warp_params: Sequence[float] = None, - x_column: str = None, - y_column: str = None, - kx_column: str = None, - ky_column: str = None, + **kwargs, ) -> None: - """Calibrate k space using the division model. + """Use the division model to calibrate the momentum axis. This function returns the distorted coordinates given the undistorted ones - a little complicated by the fact that (warp_params[6],warp_params[7]) needs to - go to (0,0) - it uses a radial distortion model called division model + a little complicated by the fact that gamma needs to go to (0,0). + it uses a radial distortion model called division model (https://en.wikipedia.org/wiki/Distortion_(optics)#Software_correction) commonly used to correct for lens artifacts. + The radial distortion parameters k0, k1, k2 are defined as follows: + .. math:: + K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) + where rpx is the distance from the center of distortion in pixels and rk is the + distance from the center of distortion in k space. + Args: - warp_params (Sequence[float], optional): warping parameters. - warp_params[0],warp_params[1] center of distortion in px - warp_params[6],warp_params[7] normal emission (Gamma) in px - warp_params[2],warp_params[3], warp_params[4] K_n; rk = rpx/(K_0 + K_1*rpx^2 + K_2*rpx^4) - warp_params[5] rotation in rad - Defaults to config["dataframe"]["warp_params"]. - x_column (str, optional): Name of the column containing the - x steps. Defaults to config["dataframe"]["x_column"]. - y_column (str, optional): Name of the column containing the - y steps. Defaults to config["dataframe"]["y_column"]. - kx_column (str, optional): Name of the column containing the - x steps. Defaults to config["dataframe"]["kx_column"]. - ky_column (str, optional): Name of the column containing the - y steps. Defaults to config["dataframe"]["ky_column"]. + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to apply the + distotion correction to. + warp_params (Sequence[float], optional): Parameters of the division model. + Either a dictionary containing the parameters or a sequence of the + parameters in the order ['center','k0','k1','k2','gamma']. + center and gamma are both 2D vectors, k0, k1 and k2 are scalars. + Center is the center of distortion in pixels, gamma is the center of + the image in k space. k0, k1 and k2 are the radial distortion parameters. + Defaults to config["momentum"]["division_model_params"]. + kwargs: Keyword arguments passed to ``calibrate_k_division_model``: + x_column (str, optional): Label of the source 'X' column. + Defaults to config["momentum"]["x_column"]. + y_column (str, optional): Label of the source 'Y' column. + Defaults to config["momentum"]["y_column"]. + kx_column (str, optional): Label of the destination 'X' column after + momentum calibration. Defaults to config["momentum"]["kx_column"]. + ky_column (str, optional): Label of the destination 'Y' column after + momentum calibration. Defaults to config["momentum"]["ky_column"]. + """ - self._dataframe, metadata = hextof.calibrate_k_division_model( + self._dataframe, metadata = self.mc.calibrate_k_division_model( df=self._dataframe, warp_params=warp_params, - x_column=x_column, - y_column=y_column, - kx_column=kx_column, - ky_column=ky_column, - config=self._config, + **kwargs, ) self._attributes.add( metadata, @@ -1319,6 +1254,35 @@ def calibrate_k_division_model( duplicate_policy="raise", ) + def save_k_division_model( + self, + filename: str = None, + overwrite: bool = False, + ) -> None: + """save the generated k division model parameters to the folder config file. + + + Args: + filename (str, optional): Filename of the config dictionary to save to. + Defaults to "sed_config.yaml" in the current folder. + overwrite (bool, optional): Option to overwrite the present dictionary. + Defaults to False. + """ + if filename is None: + filename = "sed_config.yaml" + params = {} + try: + for key in ["center", "k0", "k1", "k2", "gamma"]: + params[key] = self.mc.k_division_model[key] + except KeyError as exc: + raise KeyError( + "k division model parameters not found, need to generate parameters first!", + ) from exc + + config: Dict[str, Any] = {"momentum": {"k_division_model": params}} + save_config(config, filename, overwrite) + print(f"Saved k division model parameters to {filename}") + def pre_binning( self, df_partitions: int = 100, From f16134b7a46d2c2a3a2ac7c1d95fb038e21889d4 Mon Sep 17 00:00:00 2001 From: Steinn Ymir Agustsson Date: Thu, 16 Nov 2023 23:46:13 +0100 Subject: [PATCH 7/7] linting --- sed/calibrator/momentum.py | 7 +++++-- sed/core/processor.py | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sed/calibrator/momentum.py b/sed/calibrator/momentum.py index 1070a62f..59546946 100644 --- a/sed/calibrator/momentum.py +++ b/sed/calibrator/momentum.py @@ -1844,7 +1844,7 @@ def gather_calibration_metadata(self, calibration: dict = None) -> dict: def calibrate_k_division_model( self, df: Union[pd.DataFrame, dask.dataframe.DataFrame], - warp_params: Union[dict[str, Any], Sequence[float]] = None, + warp_params: Union[Dict[str, Any], Sequence[float]] = None, x_column: str = None, y_column: str = None, kx_column: str = None, @@ -2225,7 +2225,7 @@ def calibrate_k_division_model( y_column: str = None, kx_column: str = None, ky_column: str = None, -) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: +) -> dask.dataframe.DataFrame: """K calibration based on the division model This function returns the distorted coordinates given the undistorted ones @@ -2255,6 +2255,9 @@ def calibrate_k_division_model( If None, defaults to x_column. ky_column (str, optional): Name of the target calibrated x column. If None, defaults to y_column. + + Returns: + df (dask.dataframe.DataFrame): Dataframe with added columns """ if kx_column is None: kx_column = x_column diff --git a/sed/core/processor.py b/sed/core/processor.py index 6d7a7d0f..e6a2db59 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -1563,7 +1563,9 @@ def save_k_division_model( overwrite: bool = False, ) -> None: """save the generated k division model parameters to the folder config file. - + + + Args: filename (str, optional): Filename of the config dictionary to save to. Defaults to "sed_config.yaml" in the current folder. @@ -1575,7 +1577,7 @@ def save_k_division_model( params = {} try: for key in ["center", "k0", "k1", "k2", "gamma"]: - params[key] = self.mc.k_division_model[key] + params[key] = self.mc.division_model_params[key] except KeyError as exc: raise KeyError( "k division model parameters not found, need to generate parameters first!",