Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: ensure np.interp gets an increasing sequence in LUTs #1282

Merged
merged 12 commits into from
Oct 16, 2024
Merged
35 changes: 35 additions & 0 deletions docs/source/upcoming_release_notes/1282-fix_decr_lut.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
1282 fix_decr_lut
#################

API Breaks
----------
- N/A

Library Features
----------------
- N/A

Device Features
---------------
- N/A

New Devices
-----------
- N/A

Bugfixes
--------
- Fix an issue where the LookupTablePositioner would fail silently if the
lookup table was not strictly increasing in both axes.
Lookup tables that are strictly decreasing in either axis
will now be supported.
Lookup tables that have inconsistent ordering in either axis will
log a warning when the conversion is done.

Maintenance
-----------
- N/A

Contributors
------------
- zllentz
93 changes: 78 additions & 15 deletions pcdsdevices/pseudopos.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,9 +994,7 @@ def __init__(self, *args,
@pseudo_position_argument
def forward(self, pseudo_pos: tuple) -> tuple:
'''
Calculate a RealPosition from a given PseudoPosition

Must be defined on the subclass.
Calculate the real motor position given the pseudo position, using the lookup table.

Parameters
----------
Expand All @@ -1009,22 +1007,20 @@ def forward(self, pseudo_pos: tuple) -> tuple:
The real position output, a namedtuple.
'''
values = pseudo_pos._asdict()

pseudo_field, = self.PseudoPosition._fields
real_field, = self.RealPosition._fields
pseudo_field, real_field = self._get_field_names()
xp, fp = self._load_table_arrays(x_name=pseudo_field, f_name=real_field)

real_value = np.interp(
values[pseudo_field],
self._table_data_by_name[pseudo_field],
self._table_data_by_name[real_field]
xp,
fp,
)
return self.RealPosition(**{real_field: real_value})

@real_position_argument
def inverse(self, real_pos: tuple) -> tuple:
'''Calculate a PseudoPosition from a given RealPosition

Must be defined on the subclass.
'''
Calculate the pseudo motor position given the real position, using the lookup table.

Parameters
----------
Expand All @@ -1037,16 +1033,83 @@ def inverse(self, real_pos: tuple) -> tuple:
The pseudo position output
'''
values = real_pos._asdict()
pseudo_field, = self.PseudoPosition._fields
real_field, = self.RealPosition._fields
pseudo_field, real_field = self._get_field_names()
xp, fp = self._load_table_arrays(x_name=real_field, f_name=pseudo_field)

pseudo_value = np.interp(
values[real_field],
self._table_data_by_name[real_field],
self._table_data_by_name[pseudo_field]
xp,
fp,
)
return self.PseudoPosition(**{pseudo_field: pseudo_value})

def _get_field_names(self) -> tuple[str, str]:
"""
Returns the name of the pseudo field and the name of the real field.

Returns
-------
fields: tuple of str
(pseudo_field, real_field)
"""
pseudo_field, = self.PseudoPosition._fields
real_field, = self.RealPosition._fields

return pseudo_field, real_field

def _load_table_arrays(self, x_name: str, f_name: str) -> tuple[np.ndarray, np.ndarray]:
"""
Load array data from the lookup table in a format ready for np.interp.

This returns the xp and fp arguments that np.interp is expecting.
xp is the x-coordinates of the data points, and
fp is the y-coordinates of the data points.

np.interp is expecting xp to be strictly increasing.
If xp is not strictly increasing, this function will try reversing
the array ordering to make it so, which will work if xp is strictly
decreasing.

If xp is neither strictly increasing nor strictly decreasing
and therefore cannot be coerced into a strictly increasing sequence,
this will show a warning but continue anyway.

Parameters
----------
x_name: str
The name associated with the x-axis data.
f_name: str
The name associated with the y-axis data.

Returns
xp, fp: tuple of np.ndarray
The xp and fp arguments needed for np.interp.
"""
xp = self._table_data_by_name[x_name]
fp = self._table_data_by_name[f_name]

# xp must be strictly increasing for np.interp
if not is_strictly_increasing(xp):
# Try reverse
xp = xp[::-1]
fp = fp[::-1]
# Check one more time in case neither direction works
if not is_strictly_increasing(xp):
self.log.warning(
"Lookup table is not strictly increasing or decreasing! "
"This will give inconsistent results!"
)

return xp, fp


def is_strictly_increasing(arr: np.ndarray) -> bool:
"""
Returns True if axis 1 of arr is strictly increasing and False otherwise.
"""
# see numpy.interp docs https://numpy.org/doc/stable/reference/generated/numpy.interp.html
return np.all(np.diff(arr) > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess monotonically increasing does mean strictly increasing (never decreasing or staying the same). We just had the wrong idea of "monotonically increasing" before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually re-read the docs and found that they specified strictly increasing instead of monotonically increasing, the difference being that a monotonic sequence is allowed to repeat adjacent numbers or have a zero slope.



class OffsetMotorBase(FltMvInterface, PseudoPositioner):
"""
Expand Down
54 changes: 40 additions & 14 deletions pcdsdevices/tests/test_pseudopos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ..pseudopos import (DelayBase, LookupTablePositioner, OffsetMotorBase,
PseudoSingleInterface, SimDelayStage, SyncAxesBase,
SyncAxis, SyncAxisOffsetMode)
SyncAxis, SyncAxisOffsetMode, is_strictly_increasing)
from ..sim import FastMotor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -190,8 +190,19 @@ def test_subcls_warning():
OffsetMotorBase('prefix', name='name')


def test_lut_positioner():
logger.debug('test_lut_positioner')
@pytest.mark.parametrize(
"real_sign,pseudo_sign",
(
(1, 1),
(1, -1),
(-1, 1),
(-1, -1),
)
)
def test_lut_positioner(real_sign: bool, pseudo_sign: bool):
logger.debug('test_lut_positioner_normal')
rs = real_sign
ps = pseudo_sign

class LimitSettableSoftPositioner(SoftPositioner):
@property
Expand All @@ -206,6 +217,7 @@ class MyLUTPositioner(LookupTablePositioner):
pseudo = Cpt(PseudoSingleInterface)
real = Cpt(LimitSettableSoftPositioner)

signs = np.asarray([[rs, ps]] * 8)
table = np.asarray(
[[0, 40],
[1, 50],
Expand All @@ -216,21 +228,35 @@ class MyLUTPositioner(LookupTablePositioner):
[8, 300],
[9, 400],
]
)
) * signs
column_names = ['real', 'pseudo']
lut = MyLUTPositioner('', table=table, column_names=column_names,
name='lut')

np.testing.assert_allclose(lut.forward(60)[0], 2)
np.testing.assert_allclose(lut.inverse(7)[0], 200)
np.testing.assert_allclose(lut.inverse(1.5)[0], 55)

lut.move(100, wait=True)
np.testing.assert_allclose(lut.pseudo.position, 100)
np.testing.assert_allclose(lut.real.position, 6)

assert lut.real.limits == (0, 9)
assert lut.pseudo.limits == (40, 400)
np.testing.assert_allclose(lut.forward(60 * ps)[0], 2 * rs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did I not know np.testing existed...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, I didn't actually read the functions I was adding the sign swaps to, I was only looking for numbers and figuring out whether they were the real positions or the pseudo positions, so I also didn't realize np.testing existed.

My test here is just using the pre-existing test from Ken but adding the cases where either side is decreasing instead of increasing.

np.testing.assert_allclose(lut.inverse(7 * rs)[0], 200 * ps)
np.testing.assert_allclose(lut.inverse(1.5 * rs)[0], 55 * ps)

lut.move(100 * ps, wait=True)
np.testing.assert_allclose(lut.pseudo.position, 100 * ps)
np.testing.assert_allclose(lut.real.position, 6 * rs)

assert lut.real.limits == tuple(sorted([0 * rs, 9 * rs]))
assert lut.pseudo.limits == tuple(sorted([40 * ps, 400 * ps]))


@pytest.mark.parametrize(
"input,expected",
(
(np.asarray((0, 1, 2, 3, 4, 5)), True),
(np.asarray((0, 1, 2, 3, 4, 4)), False),
(np.asarray((0, -1, -2, -3, -4, -5)), False),
(np.asarray((0, 2, 4, -3, 5, 10)), False),
),
)
def test_increasing_helper(input: np.ndarray, expected: bool):
logger.debug("test_increasing_helper")
assert is_strictly_increasing(input) == expected


FakeDelayBase = make_fake_device(DelayBase)
Expand Down