From 097f5fba3e63a22a8a5017cdf4da2ed727d6f522 Mon Sep 17 00:00:00 2001 From: Nicolas Schodet Date: Mon, 27 Dec 2021 22:38:54 +0100 Subject: [PATCH] Improve and document get_sensor The new Brick.get_sensor allows to give an explicit class when autodetection can not work. --- docs/api/brick.rst | 2 -- docs/migration.rst | 6 ++++ nxt/brick.py | 46 ++++++++++++++++++--------- tests/test_sensors.py | 72 ++++++++++++++++++++++--------------------- 4 files changed, 75 insertions(+), 51 deletions(-) diff --git a/docs/api/brick.rst b/docs/api/brick.rst index b2d377b..0688016 100644 --- a/docs/api/brick.rst +++ b/docs/api/brick.rst @@ -36,8 +36,6 @@ Brick Motors and Sensors ------------------ - This part is still a work in progress. - .. automethod:: Brick.get_motor .. automethod:: Brick.get_sensor diff --git a/docs/migration.rst b/docs/migration.rst index ab70e82..ce7df59 100644 --- a/docs/migration.rst +++ b/docs/migration.rst @@ -117,6 +117,12 @@ NXT-Python 2 NXT-Python 3 :attr:`!Mode.MASK_SLOPE` Removed =============================== ============================ +You can now create :mod:`~nxt.sensor` objects using +:meth:`nxt.brick.Brick.get_sensor`, however direct creation still works. For +digital sensors with identification information, this can automatically detect +the sensor type as with previous version. The new `cls` argument allows +creating a sensor object using another class. + Text String or Binary String ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/nxt/brick.py b/nxt/brick.py index 593c236..d394696 100644 --- a/nxt/brick.py +++ b/nxt/brick.py @@ -272,20 +272,6 @@ def find_modules(self, pattern="*.*"): finally: self.module_close(handle) - def get_sensor(self, port): - """Tries to detect the sensor type and return the correct sensor object. - - :param nxt.sensor.Port port: Input port identifier. - :return: A sensor object. - :rtype: nxt.sensor.Sensor - :raises nxt.sensor.digital.SearchError: When sensor can not be identified. - - Only work for digital sensors with identification information. - """ - base_sensor = nxt.sensor.digital.BaseDigitalSensor(self, port, False) - info = base_sensor.get_sensor_info() - return nxt.sensor.digital.find_class(info)(self, port, check_compatible=False) - def get_motor(self, port): """Return a motor object connected to one of the brick output port. @@ -295,6 +281,38 @@ def get_motor(self, port): """ return nxt.motor.Motor(self, port) + def get_sensor(self, port, cls=None, *args, **kwargs): + """Return a sensor object connected to one of the brick input port. + + :param nxt.sensor.Port port: Input port identifier. + :param cls: Sensor class, or None to autodetect. + :type cls: typing.Type[nxt.sensor.Sensor] or None + :param args: Additional constructor positional arguments when `cls` is given. + :param kwargs: Additional constructor keyword arguments when `cls` is given. + :return: A sensor object. + :rtype: nxt.sensor.Sensor + :raises nxt.sensor.digital.SearchError: When sensor can not be identified. + + When `cls` is not given or ``None``, try to detect the sensor type and return + the correct sensor object. This only works for digital sensors with + identification information. + + For autodetection to work, the module containing the sensor class must be + imported at least once. See modules in :mod:`nxt.sensor`. + """ + if cls is None: + if args or kwargs: + raise ValueError("extra arguments with autodetect") + base_sensor = nxt.sensor.digital.BaseDigitalSensor( + self, port, check_compatible=False + ) + info = base_sensor.get_sensor_info() + return nxt.sensor.digital.find_class(info)( + self, port, check_compatible=False + ) + else: + return cls(self, port, *args, **kwargs) + def _cmd(self, tgram): """Send a message to the NXT brick and read reply. diff --git a/tests/test_sensors.py b/tests/test_sensors.py index f4db6f3..fe41f5a 100644 --- a/tests/test_sensors.py +++ b/tests/test_sensors.py @@ -39,7 +39,7 @@ class TestGeneric: """Test non digital sensors.""" def test_analog(self, mbrick): - s = nxt.sensor.analog.BaseAnalogSensor(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.analog.BaseAnalogSensor) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.SWITCH, Mode.BOOL, 1, 2, 3, 4), ] @@ -66,7 +66,7 @@ def test_touch(self, mbrick): assert ( nxt.sensor.generic.Touch.get_sample is nxt.sensor.generic.Touch.is_pressed ) - s = nxt.sensor.generic.Touch(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Touch) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.SWITCH, Mode.BOOL, 1023, 1023, 0, 1023), (Port.S1, True, False, Type.SWITCH, Mode.BOOL, 183, 183, 1, 183), @@ -84,7 +84,7 @@ def test_light(self, mbrick): nxt.sensor.generic.Light.get_sample is nxt.sensor.generic.Light.get_lightness ) - s = nxt.sensor.generic.Light(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Light) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.LIGHT_ACTIVE, Mode.RAW, 726, 250, 250, 250), (Port.S1, True, False, Type.LIGHT_INACTIVE, Mode.RAW, 823, 107, 107, 107), @@ -103,7 +103,7 @@ def test_sound(self, mbrick): assert ( nxt.sensor.generic.Sound.get_sample is nxt.sensor.generic.Sound.get_loudness ) - s = nxt.sensor.generic.Sound(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Sound) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.SOUND_DBA, Mode.RAW, 999, 15, 15, 15), (Port.S1, True, False, Type.SOUND_DB, Mode.RAW, 999, 15, 15, 15), @@ -120,7 +120,7 @@ def test_sound(self, mbrick): def test_color(self, mbrick): assert nxt.sensor.generic.Color.get_sample is nxt.sensor.generic.Color.get_color - s = nxt.sensor.generic.Color(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Color) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.COLOR_FULL, Mode.RAW, 0, 0, 4, 0), (Port.S1, True, False, Type.COLOR_FULL, Mode.RAW, 0, 0, 4, 0), @@ -153,7 +153,7 @@ class TestDigital: sensor_type_bin = b"Sonar\0\0\0" def test_get_sensor_info(self, mbrick): - s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False) mbrick.ls_get_status.return_value = 8 mbrick.ls_read.side_effect = [ self.version_bin, @@ -198,7 +198,7 @@ class DummySensor(nxt.sensor.digital.BaseDigitalSensor): assert "WARNING" in caplog.text def test_write_value(self, mbrick): - s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False) s.I2C_ADDRESS = dict(s.I2C_ADDRESS, command=(0x41, "B")) s.write_value("command", (0x12,)) assert mbrick.mock_calls == [ @@ -207,7 +207,7 @@ def test_write_value(self, mbrick): ] def test_not_ready(self, mbrick): - s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False) mbrick.ls_get_status.side_effect = [nxt.error.I2CPendingError("pending"), 8] mbrick.ls_read.return_value = self.product_id_bin assert s.read_value("product_id") == (self.product_id_bin,) @@ -220,7 +220,7 @@ def test_not_ready(self, mbrick): ] def test_status_timeout(self, mbrick): - s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False) mbrick.ls_get_status.side_effect = ( [nxt.error.I2CPendingError("pending")] * 30 * 3 ) @@ -234,7 +234,7 @@ def test_status_timeout(self, mbrick): assert mbrick.mock_calls == mock_calls def test_read_error(self, mbrick): - s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False) mbrick.ls_get_status.side_effect = [8, 8] mbrick.ls_read.side_effect = [self.product_id_bin[1:], self.product_id_bin] assert s.read_value("product_id") == (self.product_id_bin,) @@ -250,7 +250,7 @@ def test_read_error(self, mbrick): ] def test_read_timeout(self, mbrick): - s = nxt.sensor.digital.BaseDigitalSensor(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.digital.BaseDigitalSensor, False) mbrick.ls_get_status.return_value = 8 mbrick.ls_read.return_value = self.product_id_bin[1:] with pytest.raises(nxt.error.I2CError): @@ -321,7 +321,9 @@ def test_ultrasonic(self, mbrick, mdigital): nxt.sensor.generic.Ultrasonic.get_sample is nxt.sensor.generic.Ultrasonic.get_distance ) - s = nxt.sensor.generic.Ultrasonic(mbrick, Port.S1, check_compatible=False) + s = mbrick.get_sensor( + Port.S1, nxt.sensor.generic.Ultrasonic, check_compatible=False + ) mdigital.read_value.side_effect = [ (42,), (b"10E-2m\0",), @@ -351,7 +353,7 @@ def test_temperature(self, mbrick, mdigital): nxt.sensor.generic.Temperature.get_sample is nxt.sensor.generic.Temperature.get_deg_c ) - s = nxt.sensor.generic.Temperature(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.generic.Temperature) mdigital.read_value.return_value = (1600 * 16,) assert s.get_deg_c() == 100 assert s.get_deg_f() == 212 @@ -365,7 +367,7 @@ class TestMindsensors: """Test Mindsensors sensors.""" def test_sumoeyes(self, mbrick): - s = nxt.sensor.mindsensors.SumoEyes(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.SumoEyes) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.LIGHT_ACTIVE, Mode.RAW, 0, 0, 0, 0), (Port.S1, True, False, Type.LIGHT_ACTIVE, Mode.RAW, 0, 350, 0, 0), @@ -396,7 +398,7 @@ def test_compassv2(self, mbrick, mdigital): nxt.sensor.mindsensors.Compassv2.get_sample is nxt.sensor.mindsensors.Compassv2.get_heading ) - s = nxt.sensor.mindsensors.Compassv2(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.Compassv2, False) mdigital.read_value.return_value = (300,) # TODO: should return degrees (divide by 10). assert s.get_heading() == 300 @@ -413,7 +415,7 @@ def test_dist(self, mbrick, mdigital): nxt.sensor.mindsensors.DIST.get_sample is nxt.sensor.mindsensors.DIST.get_distance ) - s = nxt.sensor.mindsensors.DIST(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.DIST, False) # TODO: get rid of ord, adapt format. mdigital.read_value.side_effect = [(100,), (ord("2"),), (42,), (43,), (44,)] assert s.get_distance() == 100 @@ -432,7 +434,7 @@ def test_dist(self, mbrick, mdigital): ] def test_rtc(self, mbrick, mdigital): - s = nxt.sensor.mindsensors.RTC(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.RTC) # TODO: this one is completely broken: # - Return str instead of int. # - Bad handling of hour format. @@ -447,7 +449,7 @@ def test_accl(self, mbrick, mdigital): nxt.sensor.mindsensors.ACCL.get_sample is nxt.sensor.mindsensors.ACCL.get_all_accel ) - s = nxt.sensor.mindsensors.ACCL(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.ACCL, False) # TODO: get rid of ord, adapt format. mdigital.read_value.side_effect = [ (ord("2"),), @@ -483,7 +485,7 @@ def test_accl(self, mbrick, mdigital): def test_mtrmux(self, mbrick, mdigital): assert not hasattr(nxt.sensor.mindsensors.MTRMUX, "get_sample") - s = nxt.sensor.mindsensors.MTRMUX(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.MTRMUX, False) mdigital.read_value.side_effect = [(1,), (2,)] s.command(s.Commands.FLOAT) s.set_direction(1, 1) @@ -503,7 +505,7 @@ def test_lineleader(self, mbrick, mdigital): nxt.sensor.mindsensors.LineLeader.get_sample is nxt.sensor.mindsensors.LineLeader.get_reading_all ) - s = nxt.sensor.mindsensors.LineLeader(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.LineLeader, False) mdigital.read_value.side_effect = [ (-10,), (50,), @@ -541,7 +543,7 @@ def test_lineleader(self, mbrick, mdigital): def test_servo(self, mbrick, mdigital): assert not hasattr(nxt.sensor.mindsensors.Servo, "get_sample") - s = nxt.sensor.mindsensors.Servo(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.Servo, False) # TODO: command can not work, can not fit two bytes in one byte. mdigital.read_value.side_effect = [(1,), (42,), (43,)] assert s.get_bat_level() == 1 @@ -561,7 +563,7 @@ def test_servo(self, mbrick, mdigital): def test_mmx(self, mbrick, mdigital): assert not hasattr(nxt.sensor.mindsensors.MMX, "get_sample") - s = nxt.sensor.mindsensors.MMX(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.MMX, False) mdigital.read_value.side_effect = [ (1,), (0xAA,), @@ -607,7 +609,7 @@ def test_mmx(self, mbrick, mdigital): def test_hid(self, mbrick, mdigital): assert not hasattr(nxt.sensor.mindsensors.HID, "get_sample") - s = nxt.sensor.mindsensors.HID(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.HID, False) s.command(s.Commands.ASCII_MODE) s.set_modifier(42) s.write_data("a") @@ -618,7 +620,7 @@ def test_hid(self, mbrick, mdigital): ] def test_ps2(self, mbrick, mdigital): - s = nxt.sensor.mindsensors.PS2(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.mindsensors.PS2, False) mdigital.read_value.side_effect = [ (42,), (0x55,), @@ -671,7 +673,7 @@ def test_compass(self, mbrick, mdigital): nxt.sensor.hitechnic.Compass.get_sample is nxt.sensor.hitechnic.Compass.get_heading ) - s = nxt.sensor.hitechnic.Compass(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Compass, False) mdigital.read_value.return_value = (10,) assert s.get_heading() == 30 assert s.get_relative_heading(0) == 30 @@ -698,7 +700,7 @@ def test_accelerometer(self, mbrick, mdigital): nxt.sensor.hitechnic.Accelerometer.get_sample is nxt.sensor.hitechnic.Accelerometer.get_acceleration ) - s = nxt.sensor.hitechnic.Accelerometer(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Accelerometer, False) mdigital.read_value.return_value = (0x12, 0x23, -0x32, 0x3, 0x0, 0x2) v = s.get_acceleration() assert (v.x, v.y, v.z) == (75, 140, -198) @@ -711,7 +713,7 @@ def test_irreceiver(self, mbrick, mdigital): nxt.sensor.hitechnic.IRReceiver.get_sample is nxt.sensor.hitechnic.IRReceiver.get_speeds ) - s = nxt.sensor.hitechnic.IRReceiver(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.IRReceiver, False) mdigital.read_value.return_value = (0, -16, 30, -44, 58, -72, 100, -128) v = s.get_speeds() assert (v.m1A, v.m1B) == (0, -16) @@ -732,7 +734,7 @@ def test_irseekerv2(self, mbrick, mdigital): nxt.sensor.hitechnic.IRSeekerv2.get_sample is nxt.sensor.hitechnic.IRSeekerv2.get_ac_values ) - s = nxt.sensor.hitechnic.IRSeekerv2(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.IRSeekerv2, False) mdigital.read_value.side_effect = [ (5, 42, 43, 44, 45, 46, 44), (5, 42, 43, 44, 45, 46), @@ -772,7 +774,7 @@ def test_eopd(self, mbrick): nxt.sensor.hitechnic.EOPD.get_sample is nxt.sensor.hitechnic.EOPD.get_scaled_value ) - s = nxt.sensor.hitechnic.EOPD(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.EOPD) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.LIGHT_INACTIVE, Mode.RAW, 523, 0, 0, 0), (Port.S1, True, False, Type.LIGHT_INACTIVE, Mode.RAW, 398, 0, 0, 0), @@ -800,7 +802,7 @@ def test_colorv2(self, mbrick, mdigital): nxt.sensor.hitechnic.Colorv2.get_sample is nxt.sensor.hitechnic.Colorv2.get_active_color ) - s = nxt.sensor.hitechnic.Colorv2(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Colorv2, False) mdigital.read_value.side_effect = [ (8, 100, 50, 0, 75, 42, 66, 33, 0), (100, 50, 0, 75), @@ -835,7 +837,7 @@ def test_giro(self, mbrick): nxt.sensor.hitechnic.Gyro.get_sample is nxt.sensor.hitechnic.Gyro.get_rotation_speed ) - s = nxt.sensor.hitechnic.Gyro(mbrick, Port.S1) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Gyro) mbrick.get_input_values.side_effect = [ (Port.S1, True, False, Type.ANGLE, Mode.RAW, 0, 0, 42, 0), (Port.S1, True, False, Type.ANGLE, Mode.RAW, 0, 0, 42, 0), @@ -857,7 +859,7 @@ def test_giro(self, mbrick): def test_prototype(self, mbrick, mdigital): assert not hasattr(nxt.sensor.hitechnic.Prototype, "get_sample") - s = nxt.sensor.hitechnic.Prototype(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Prototype, False) mdigital.read_value.side_effect = [ (42, 43, 44, 45, 46), (0x2A,), @@ -890,7 +892,7 @@ def test_prototype(self, mbrick, mdigital): def test_servocon(self, mbrick, mdigital): assert not hasattr(nxt.sensor.hitechnic.ServoCon, "get_sample") - s = nxt.sensor.hitechnic.ServoCon(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.ServoCon, False) mdigital.read_value.side_effect = [ (1,), (43,), @@ -910,7 +912,7 @@ def test_servocon(self, mbrick, mdigital): def test_motorcon(self, mbrick, mdigital): assert not hasattr(nxt.sensor.hitechnic.MotorCon, "get_sample") - s = nxt.sensor.hitechnic.MotorCon(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.MotorCon, False) mdigital.read_value.side_effect = [ (123456,), (654321,), @@ -953,7 +955,7 @@ def test_angle(self, mbrick, mdigital): nxt.sensor.hitechnic.Angle.get_sample is nxt.sensor.hitechnic.Angle.get_angle ) - s = nxt.sensor.hitechnic.Angle(mbrick, Port.S1, False) + s = mbrick.get_sensor(Port.S1, nxt.sensor.hitechnic.Angle, False) mdigital.read_value.side_effect = [ (21, 1), (123456789,),