Skip to content

Commit

Permalink
Correct merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Willemsen committed Nov 7, 2023
1 parent d96e13d commit 93923f9
Showing 1 changed file with 0 additions and 187 deletions.
187 changes: 0 additions & 187 deletions src/dodal/devices/fast_grid_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,193 +13,6 @@

from dodal.devices.fast_grid_scan_common import GridScanCompleteStatus, GridScanParams
from dodal.devices.status import await_value
<<<<<<< HEAD
=======
from dodal.parameters.experiment_parameter_base import AbstractExperimentParameterBase


@dataclass
class GridAxis:
start: float
step_size: float
full_steps: int

def steps_to_motor_position(self, steps):
"""Gives the motor position based on steps, where steps are 0 indexed"""
return self.start + self.step_size * steps

@property
def end(self):
"""Gives the point where the final frame is taken"""
# Note that full_steps is one indexed e.g. if there is one step then the end is
# refering to the first position
return self.steps_to_motor_position(self.full_steps - 1)

def is_within(self, steps):
return 0 <= steps <= self.full_steps


class GridScanParams(BaseModel, AbstractExperimentParameterBase):
"""
Holder class for the parameters of a grid scan in a similar
layout to EPICS.
Motion program will do a grid in x-y then rotate omega +90 and perform
a grid in x-z.
The grid specified is where data is taken e.g. it can be assumed the first frame is
at x_start, y1_start, z1_start and subsequent frames are N*step_size away.
"""

x_steps: int = 1
y_steps: int = 1
z_steps: int = 0
x_step_size: float = 0.1
y_step_size: float = 0.1
z_step_size: float = 0.1
dwell_time_ms: float = 0.1
x_start: float = 0.1
y1_start: float = 0.1
y2_start: float = 0.1
z1_start: float = 0.1
z2_start: float = 0.1
x_axis: GridAxis = GridAxis(0, 0, 0)
y_axis: GridAxis = GridAxis(0, 0, 0)
z_axis: GridAxis = GridAxis(0, 0, 0)

class Config:
arbitrary_types_allowed = True
fields = {
"x_axis": {"exclude": True},
"y_axis": {"exclude": True},
"z_axis": {"exclude": True},
}

@validator("x_axis", always=True)
def _get_x_axis(cls, x_axis: GridAxis, values: dict[str, Any]) -> GridAxis:
return GridAxis(values["x_start"], values["x_step_size"], values["x_steps"])

@validator("y_axis", always=True)
def _get_y_axis(cls, y_axis: GridAxis, values: dict[str, Any]) -> GridAxis:
return GridAxis(values["y1_start"], values["y_step_size"], values["y_steps"])

@validator("z_axis", always=True)
def _get_z_axis(cls, z_axis: GridAxis, values: dict[str, Any]) -> GridAxis:
return GridAxis(values["z2_start"], values["z_step_size"], values["z_steps"])

def is_valid(self, limits: XYZLimitBundle) -> bool:
"""
Validates scan parameters
:param limits: The motor limits against which to validate
the parameters
:return: True if the scan is valid
"""
x_in_limits = limits.x.is_within(self.x_axis.start) and limits.x.is_within(
self.x_axis.end
)
y_in_limits = limits.y.is_within(self.y_axis.start) and limits.y.is_within(
self.y_axis.end
)

first_grid_in_limits = (
x_in_limits and y_in_limits and limits.z.is_within(self.z1_start)
)

z_in_limits = limits.z.is_within(self.z_axis.start) and limits.z.is_within(
self.z_axis.end
)

second_grid_in_limits = (
x_in_limits and z_in_limits and limits.y.is_within(self.y2_start)
)

return first_grid_in_limits and second_grid_in_limits

def get_num_images(self):
return self.x_steps * self.y_steps + self.x_steps * self.z_steps

@property
def is_3d_grid_scan(self):
return self.z_steps > 0

def grid_position_to_motor_position(self, grid_position: ndarray) -> ndarray:
"""Converts a grid position, given as steps in the x, y, z grid,
to a real motor position.
:param grid_position: The x, y, z position in grid steps
:return: The motor position this corresponds to.
:raises: IndexError if the desired position is outside the grid."""
for position, axis in zip(
grid_position, [self.x_axis, self.y_axis, self.z_axis]
):
if not axis.is_within(position):
raise IndexError(f"{grid_position} is outside the bounds of the grid")

return np.array(
[
self.x_axis.steps_to_motor_position(grid_position[0]),
self.y_axis.steps_to_motor_position(grid_position[1]),
self.z_axis.steps_to_motor_position(grid_position[2]),
]
)


class GridScanCompleteStatus(DeviceStatus):
"""
A Status for the grid scan completion
A special status object that notifies watchers (progress bars)
based on comparing device.expected_images to device.position_counter.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_ts = time.time()

self.device.position_counter.subscribe(self._notify_watchers)
self.device.status.subscribe(self._running_changed)

self._name = self.device.name
self._target_count = self.device.expected_images.get()

def _notify_watchers(self, value, *args, **kwargs):
if not self._watchers:
return
time_elapsed = time.time() - self.start_ts
try:
fraction = 1 - value / self._target_count
except ZeroDivisionError:
fraction = 0
time_remaining = 0
except Exception as e:
fraction = None
time_remaining = None
self.set_exception(e)
self.clean_up()
else:
time_remaining = time_elapsed / fraction
for watcher in self._watchers:
watcher(
name=self._name,
current=value,
initial=0,
target=self._target_count,
unit="images",
precision=0,
fraction=fraction,
time_elapsed=time_elapsed,
time_remaining=time_remaining,
)

def _running_changed(self, value=None, old_value=None, **kwargs):
if (old_value == 1) and (value == 0):
self.set_finished()
self.clean_up()

def clean_up(self):
self.device.position_counter.clear_sub(self._notify_watchers)
self.device.status.clear_sub(self._running_changed)
>>>>>>> origin/main


class FastGridScan(Device):
Expand Down

0 comments on commit 93923f9

Please sign in to comment.