Skip to content

Commit

Permalink
Improve and document get_sensor
Browse files Browse the repository at this point in the history
The new Brick.get_sensor allows to give an explicit class when
autodetection can not work.
  • Loading branch information
schodet committed Dec 27, 2021
1 parent e2fc88f commit 097f5fb
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 51 deletions.
2 changes: 0 additions & 2 deletions docs/api/brick.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ Brick
Motors and Sensors
------------------

This part is still a work in progress.

.. automethod:: Brick.get_motor
.. automethod:: Brick.get_sensor

Expand Down
6 changes: 6 additions & 0 deletions docs/migration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ NXT-Python 2 NXT-Python 3
:attr:`!Mode.MASK_SLOPE` Removed
=============================== ============================

You can now create :mod:`~nxt.sensor` objects using
:meth:`nxt.brick.Brick.get_sensor`, however direct creation still works. For
digital sensors with identification information, this can automatically detect
the sensor type as with previous version. The new `cls` argument allows
creating a sensor object using another class.


Text String or Binary String
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
46 changes: 32 additions & 14 deletions nxt/brick.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,20 +272,6 @@ def find_modules(self, pattern="*.*"):
finally:
self.module_close(handle)

def get_sensor(self, port):
"""Tries to detect the sensor type and return the correct sensor object.
:param nxt.sensor.Port port: Input port identifier.
:return: A sensor object.
:rtype: nxt.sensor.Sensor
:raises nxt.sensor.digital.SearchError: When sensor can not be identified.
Only work for digital sensors with identification information.
"""
base_sensor = nxt.sensor.digital.BaseDigitalSensor(self, port, False)
info = base_sensor.get_sensor_info()
return nxt.sensor.digital.find_class(info)(self, port, check_compatible=False)

def get_motor(self, port):
"""Return a motor object connected to one of the brick output port.
Expand All @@ -295,6 +281,38 @@ def get_motor(self, port):
"""
return nxt.motor.Motor(self, port)

def get_sensor(self, port, cls=None, *args, **kwargs):
"""Return a sensor object connected to one of the brick input port.
:param nxt.sensor.Port port: Input port identifier.
:param cls: Sensor class, or None to autodetect.
:type cls: typing.Type[nxt.sensor.Sensor] or None
:param args: Additional constructor positional arguments when `cls` is given.
:param kwargs: Additional constructor keyword arguments when `cls` is given.
:return: A sensor object.
:rtype: nxt.sensor.Sensor
:raises nxt.sensor.digital.SearchError: When sensor can not be identified.
When `cls` is not given or ``None``, try to detect the sensor type and return
the correct sensor object. This only works for digital sensors with
identification information.
For autodetection to work, the module containing the sensor class must be
imported at least once. See modules in :mod:`nxt.sensor`.
"""
if cls is None:
if args or kwargs:
raise ValueError("extra arguments with autodetect")
base_sensor = nxt.sensor.digital.BaseDigitalSensor(
self, port, check_compatible=False
)
info = base_sensor.get_sensor_info()
return nxt.sensor.digital.find_class(info)(
self, port, check_compatible=False
)
else:
return cls(self, port, *args, **kwargs)

def _cmd(self, tgram):
"""Send a message to the NXT brick and read reply.
Expand Down
72 changes: 37 additions & 35 deletions tests/test_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TestGeneric:
"""Test non digital sensors."""

def test_analog(self, mbrick):
s = nxt.sensor.analog.BaseAnalogSensor(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.analog.BaseAnalogSensor)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.SWITCH, Mode.BOOL, 1, 2, 3, 4),
]
Expand All @@ -66,7 +66,7 @@ def test_touch(self, mbrick):
assert (
nxt.sensor.generic.Touch.get_sample is nxt.sensor.generic.Touch.is_pressed
)
s = nxt.sensor.generic.Touch(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Touch)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.SWITCH, Mode.BOOL, 1023, 1023, 0, 1023),
(Port.S1, True, False, Type.SWITCH, Mode.BOOL, 183, 183, 1, 183),
Expand All @@ -84,7 +84,7 @@ def test_light(self, mbrick):
nxt.sensor.generic.Light.get_sample
is nxt.sensor.generic.Light.get_lightness
)
s = nxt.sensor.generic.Light(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Light)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.LIGHT_ACTIVE, Mode.RAW, 726, 250, 250, 250),
(Port.S1, True, False, Type.LIGHT_INACTIVE, Mode.RAW, 823, 107, 107, 107),
Expand All @@ -103,7 +103,7 @@ def test_sound(self, mbrick):
assert (
nxt.sensor.generic.Sound.get_sample is nxt.sensor.generic.Sound.get_loudness
)
s = nxt.sensor.generic.Sound(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Sound)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.SOUND_DBA, Mode.RAW, 999, 15, 15, 15),
(Port.S1, True, False, Type.SOUND_DB, Mode.RAW, 999, 15, 15, 15),
Expand All @@ -120,7 +120,7 @@ def test_sound(self, mbrick):

def test_color(self, mbrick):
assert nxt.sensor.generic.Color.get_sample is nxt.sensor.generic.Color.get_color
s = nxt.sensor.generic.Color(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Color)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.COLOR_FULL, Mode.RAW, 0, 0, 4, 0),
(Port.S1, True, False, Type.COLOR_FULL, Mode.RAW, 0, 0, 4, 0),
Expand Down Expand Up @@ -153,7 +153,7 @@ class TestDigital:
sensor_type_bin = b"Sonar\0\0\0"

def test_get_sensor_info(self, mbrick):
s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False)
mbrick.ls_get_status.return_value = 8
mbrick.ls_read.side_effect = [
self.version_bin,
Expand Down Expand Up @@ -198,7 +198,7 @@ class DummySensor(nxt.sensor.digital.BaseDigitalSensor):
assert "WARNING" in caplog.text

def test_write_value(self, mbrick):
s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False)
s.I2C_ADDRESS = dict(s.I2C_ADDRESS, command=(0x41, "B"))
s.write_value("command", (0x12,))
assert mbrick.mock_calls == [
Expand All @@ -207,7 +207,7 @@ def test_write_value(self, mbrick):
]

def test_not_ready(self, mbrick):
s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False)
mbrick.ls_get_status.side_effect = [nxt.error.I2CPendingError("pending"), 8]
mbrick.ls_read.return_value = self.product_id_bin
assert s.read_value("product_id") == (self.product_id_bin,)
Expand All @@ -220,7 +220,7 @@ def test_not_ready(self, mbrick):
]

def test_status_timeout(self, mbrick):
s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False)
mbrick.ls_get_status.side_effect = (
[nxt.error.I2CPendingError("pending")] * 30 * 3
)
Expand All @@ -234,7 +234,7 @@ def test_status_timeout(self, mbrick):
assert mbrick.mock_calls == mock_calls

def test_read_error(self, mbrick):
s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False)
mbrick.ls_get_status.side_effect = [8, 8]
mbrick.ls_read.side_effect = [self.product_id_bin[1:], self.product_id_bin]
assert s.read_value("product_id") == (self.product_id_bin,)
Expand All @@ -250,7 +250,7 @@ def test_read_error(self, mbrick):
]

def test_read_timeout(self, mbrick):
s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False)
mbrick.ls_get_status.return_value = 8
mbrick.ls_read.return_value = self.product_id_bin[1:]
with pytest.raises(nxt.error.I2CError):
Expand Down Expand Up @@ -321,7 +321,9 @@ def test_ultrasonic(self, mbrick, mdigital):
nxt.sensor.generic.Ultrasonic.get_sample
is nxt.sensor.generic.Ultrasonic.get_distance
)
s = nxt.sensor.generic.Ultrasonic(mbrick, Port.S1, check_compatible=False)
s = mbrick.get_sensor(
Port.S1, nxt.sensor.generic.Ultrasonic, check_compatible=False
)
mdigital.read_value.side_effect = [
(42,),
(b"10E-2m\0",),
Expand Down Expand Up @@ -351,7 +353,7 @@ def test_temperature(self, mbrick, mdigital):
nxt.sensor.generic.Temperature.get_sample
is nxt.sensor.generic.Temperature.get_deg_c
)
s = nxt.sensor.generic.Temperature(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Temperature)
mdigital.read_value.return_value = (1600 * 16,)
assert s.get_deg_c() == 100
assert s.get_deg_f() == 212
Expand All @@ -365,7 +367,7 @@ class TestMindsensors:
"""Test Mindsensors sensors."""

def test_sumoeyes(self, mbrick):
s = nxt.sensor.mindsensors.SumoEyes(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.SumoEyes)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.LIGHT_ACTIVE, Mode.RAW, 0, 0, 0, 0),
(Port.S1, True, False, Type.LIGHT_ACTIVE, Mode.RAW, 0, 350, 0, 0),
Expand Down Expand Up @@ -396,7 +398,7 @@ def test_compassv2(self, mbrick, mdigital):
nxt.sensor.mindsensors.Compassv2.get_sample
is nxt.sensor.mindsensors.Compassv2.get_heading
)
s = nxt.sensor.mindsensors.Compassv2(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.Compassv2, False)
mdigital.read_value.return_value = (300,)
# TODO: should return degrees (divide by 10).
assert s.get_heading() == 300
Expand All @@ -413,7 +415,7 @@ def test_dist(self, mbrick, mdigital):
nxt.sensor.mindsensors.DIST.get_sample
is nxt.sensor.mindsensors.DIST.get_distance
)
s = nxt.sensor.mindsensors.DIST(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.DIST, False)
# TODO: get rid of ord, adapt format.
mdigital.read_value.side_effect = [(100,), (ord("2"),), (42,), (43,), (44,)]
assert s.get_distance() == 100
Expand All @@ -432,7 +434,7 @@ def test_dist(self, mbrick, mdigital):
]

def test_rtc(self, mbrick, mdigital):
s = nxt.sensor.mindsensors.RTC(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.RTC)
# TODO: this one is completely broken:
# - Return str instead of int.
# - Bad handling of hour format.
Expand All @@ -447,7 +449,7 @@ def test_accl(self, mbrick, mdigital):
nxt.sensor.mindsensors.ACCL.get_sample
is nxt.sensor.mindsensors.ACCL.get_all_accel
)
s = nxt.sensor.mindsensors.ACCL(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.ACCL, False)
# TODO: get rid of ord, adapt format.
mdigital.read_value.side_effect = [
(ord("2"),),
Expand Down Expand Up @@ -483,7 +485,7 @@ def test_accl(self, mbrick, mdigital):

def test_mtrmux(self, mbrick, mdigital):
assert not hasattr(nxt.sensor.mindsensors.MTRMUX, "get_sample")
s = nxt.sensor.mindsensors.MTRMUX(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.MTRMUX, False)
mdigital.read_value.side_effect = [(1,), (2,)]
s.command(s.Commands.FLOAT)
s.set_direction(1, 1)
Expand All @@ -503,7 +505,7 @@ def test_lineleader(self, mbrick, mdigital):
nxt.sensor.mindsensors.LineLeader.get_sample
is nxt.sensor.mindsensors.LineLeader.get_reading_all
)
s = nxt.sensor.mindsensors.LineLeader(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.LineLeader, False)
mdigital.read_value.side_effect = [
(-10,),
(50,),
Expand Down Expand Up @@ -541,7 +543,7 @@ def test_lineleader(self, mbrick, mdigital):

def test_servo(self, mbrick, mdigital):
assert not hasattr(nxt.sensor.mindsensors.Servo, "get_sample")
s = nxt.sensor.mindsensors.Servo(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.Servo, False)
# TODO: command can not work, can not fit two bytes in one byte.
mdigital.read_value.side_effect = [(1,), (42,), (43,)]
assert s.get_bat_level() == 1
Expand All @@ -561,7 +563,7 @@ def test_servo(self, mbrick, mdigital):

def test_mmx(self, mbrick, mdigital):
assert not hasattr(nxt.sensor.mindsensors.MMX, "get_sample")
s = nxt.sensor.mindsensors.MMX(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.MMX, False)
mdigital.read_value.side_effect = [
(1,),
(0xAA,),
Expand Down Expand Up @@ -607,7 +609,7 @@ def test_mmx(self, mbrick, mdigital):

def test_hid(self, mbrick, mdigital):
assert not hasattr(nxt.sensor.mindsensors.HID, "get_sample")
s = nxt.sensor.mindsensors.HID(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.HID, False)
s.command(s.Commands.ASCII_MODE)
s.set_modifier(42)
s.write_data("a")
Expand All @@ -618,7 +620,7 @@ def test_hid(self, mbrick, mdigital):
]

def test_ps2(self, mbrick, mdigital):
s = nxt.sensor.mindsensors.PS2(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.PS2, False)
mdigital.read_value.side_effect = [
(42,),
(0x55,),
Expand Down Expand Up @@ -671,7 +673,7 @@ def test_compass(self, mbrick, mdigital):
nxt.sensor.hitechnic.Compass.get_sample
is nxt.sensor.hitechnic.Compass.get_heading
)
s = nxt.sensor.hitechnic.Compass(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Compass, False)
mdigital.read_value.return_value = (10,)
assert s.get_heading() == 30
assert s.get_relative_heading(0) == 30
Expand All @@ -698,7 +700,7 @@ def test_accelerometer(self, mbrick, mdigital):
nxt.sensor.hitechnic.Accelerometer.get_sample
is nxt.sensor.hitechnic.Accelerometer.get_acceleration
)
s = nxt.sensor.hitechnic.Accelerometer(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Accelerometer, False)
mdigital.read_value.return_value = (0x12, 0x23, -0x32, 0x3, 0x0, 0x2)
v = s.get_acceleration()
assert (v.x, v.y, v.z) == (75, 140, -198)
Expand All @@ -711,7 +713,7 @@ def test_irreceiver(self, mbrick, mdigital):
nxt.sensor.hitechnic.IRReceiver.get_sample
is nxt.sensor.hitechnic.IRReceiver.get_speeds
)
s = nxt.sensor.hitechnic.IRReceiver(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.IRReceiver, False)
mdigital.read_value.return_value = (0, -16, 30, -44, 58, -72, 100, -128)
v = s.get_speeds()
assert (v.m1A, v.m1B) == (0, -16)
Expand All @@ -732,7 +734,7 @@ def test_irseekerv2(self, mbrick, mdigital):
nxt.sensor.hitechnic.IRSeekerv2.get_sample
is nxt.sensor.hitechnic.IRSeekerv2.get_ac_values
)
s = nxt.sensor.hitechnic.IRSeekerv2(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.IRSeekerv2, False)
mdigital.read_value.side_effect = [
(5, 42, 43, 44, 45, 46, 44),
(5, 42, 43, 44, 45, 46),
Expand Down Expand Up @@ -772,7 +774,7 @@ def test_eopd(self, mbrick):
nxt.sensor.hitechnic.EOPD.get_sample
is nxt.sensor.hitechnic.EOPD.get_scaled_value
)
s = nxt.sensor.hitechnic.EOPD(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.EOPD)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.LIGHT_INACTIVE, Mode.RAW, 523, 0, 0, 0),
(Port.S1, True, False, Type.LIGHT_INACTIVE, Mode.RAW, 398, 0, 0, 0),
Expand Down Expand Up @@ -800,7 +802,7 @@ def test_colorv2(self, mbrick, mdigital):
nxt.sensor.hitechnic.Colorv2.get_sample
is nxt.sensor.hitechnic.Colorv2.get_active_color
)
s = nxt.sensor.hitechnic.Colorv2(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Colorv2, False)
mdigital.read_value.side_effect = [
(8, 100, 50, 0, 75, 42, 66, 33, 0),
(100, 50, 0, 75),
Expand Down Expand Up @@ -835,7 +837,7 @@ def test_giro(self, mbrick):
nxt.sensor.hitechnic.Gyro.get_sample
is nxt.sensor.hitechnic.Gyro.get_rotation_speed
)
s = nxt.sensor.hitechnic.Gyro(mbrick, Port.S1)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Gyro)
mbrick.get_input_values.side_effect = [
(Port.S1, True, False, Type.ANGLE, Mode.RAW, 0, 0, 42, 0),
(Port.S1, True, False, Type.ANGLE, Mode.RAW, 0, 0, 42, 0),
Expand All @@ -857,7 +859,7 @@ def test_giro(self, mbrick):

def test_prototype(self, mbrick, mdigital):
assert not hasattr(nxt.sensor.hitechnic.Prototype, "get_sample")
s = nxt.sensor.hitechnic.Prototype(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Prototype, False)
mdigital.read_value.side_effect = [
(42, 43, 44, 45, 46),
(0x2A,),
Expand Down Expand Up @@ -890,7 +892,7 @@ def test_prototype(self, mbrick, mdigital):

def test_servocon(self, mbrick, mdigital):
assert not hasattr(nxt.sensor.hitechnic.ServoCon, "get_sample")
s = nxt.sensor.hitechnic.ServoCon(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.ServoCon, False)
mdigital.read_value.side_effect = [
(1,),
(43,),
Expand All @@ -910,7 +912,7 @@ def test_servocon(self, mbrick, mdigital):

def test_motorcon(self, mbrick, mdigital):
assert not hasattr(nxt.sensor.hitechnic.MotorCon, "get_sample")
s = nxt.sensor.hitechnic.MotorCon(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.MotorCon, False)
mdigital.read_value.side_effect = [
(123456,),
(654321,),
Expand Down Expand Up @@ -953,7 +955,7 @@ def test_angle(self, mbrick, mdigital):
nxt.sensor.hitechnic.Angle.get_sample
is nxt.sensor.hitechnic.Angle.get_angle
)
s = nxt.sensor.hitechnic.Angle(mbrick, Port.S1, False)
s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Angle, False)
mdigital.read_value.side_effect = [
(21, 1),
(123456789,),
Expand Down

0 comments on commit 097f5fb

Please sign in to comment.