diff --git a/lib/src/binding_mqtt/mqtt_client.dart b/lib/src/binding_mqtt/mqtt_client.dart index c4274c0d..09c54ed4 100644 --- a/lib/src/binding_mqtt/mqtt_client.dart +++ b/lib/src/binding_mqtt/mqtt_client.dart @@ -21,7 +21,7 @@ import "mqtt_subscription.dart"; /// [ProtocolClient] for supporting the MQTT protocol. /// /// Currently, only MQTT version 3.1.1 is supported. -final class MqttClient extends ProtocolClient { +final class MqttClient extends ProtocolClient with MqttDiscoverer { /// Constructor. MqttClient({ MqttConfig? mqttConfig, @@ -197,4 +197,46 @@ final class MqttClient extends ProtocolClient { return MqttSubscription(form, client, complete, next: next, error: error); } + + @override + Stream performMqttDiscovery( + Uri brokerUri, { + required String discoveryTopic, + required String expectedContentType, + required Duration discoveryTimeout, + }) async* { + final client = await _connect(brokerUri, null); + + // TODO: Revisit QoS value and subscription check + if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) { + throw MqttBindingException( + "Subscription to topic $discoveryTopic failed", + ); + } + + final receivedMessageStream = client.updates; + if (receivedMessageStream == null) { + throw MqttBindingException( + "Subscription to topic $discoveryTopic failed", + ); + } + + Timer( + discoveryTimeout, + () async { + client.disconnect(); + }, + ); + + await for (final receivedMessageList in receivedMessageStream) { + for (final receivedMessage in receivedMessageList) { + final mqttMessage = receivedMessage.payload; + if (mqttMessage is MqttPublishMessage) { + final messagePayload = mqttMessage.payload.message; + + yield Content(expectedContentType, Stream.value(messagePayload)); + } + } + } + } } diff --git a/lib/src/core/implementation/discovery/discovery_configuration.dart b/lib/src/core/implementation/discovery/discovery_configuration.dart index a4829f7f..e706326d 100644 --- a/lib/src/core/implementation/discovery/discovery_configuration.dart +++ b/lib/src/core/implementation/discovery/discovery_configuration.dart @@ -138,6 +138,45 @@ final class ExploreDirectoryConfiguration extends DiscoveryConfiguration { final int? limit; } +/// Experimental [DiscoveryConfiguration] that is used to perform discovery with +/// the MQTT protocol. +@experimental +final class MqttDiscoveryConfiguration extends DiscoveryConfiguration { + /// Instantiates a new [DiscoveryConfiguration] for MQTT. + const MqttDiscoveryConfiguration( + this.brokerUri, { + this.discoveryTopic = "wot/td/#", + this.expectedContentType = "application/td+json", + this.discoveryTimeout = const Duration(seconds: 5), + }); + + /// [Uri] of the broker the + final Uri brokerUri; + + /// The topic that will be used for performing the discovery process. + /// + /// If a wildcard topic is used, then the discovery process may return more + /// than one TD. + /// + /// Defaults to `wot/td/#`. + final String discoveryTopic; + + /// The Thing Description content type that is expected during the discovery + /// process. + /// + /// Data that is received during the discovery process that is not + /// deserializable using the content type provided here will be ignored. + /// + /// Defaults to `application/td+json`. + final String expectedContentType; + + /// Time period after which the MQTT discovery process is going to be + /// cancelled. + /// + /// Defaults to five seconds. + final Duration discoveryTimeout; +} + /// Base class for configuring discovery mechanisms that involve a two-step /// approach. /// diff --git a/lib/src/core/implementation/thing_discovery.dart b/lib/src/core/implementation/thing_discovery.dart index 9277bde6..5c916daf 100644 --- a/lib/src/core/implementation/thing_discovery.dart +++ b/lib/src/core/implementation/thing_discovery.dart @@ -75,6 +75,18 @@ class ThingDiscovery extends Stream thingFilter: thingFilter, ); yield* thingDiscoveryProcess; + case MqttDiscoveryConfiguration( + :final brokerUri, + :final discoveryTopic, + :final expectedContentType, + :final discoveryTimeout, + ): + yield* _performMqttDiscovery( + brokerUri, + discoveryTopic, + expectedContentType, + discoveryTimeout, + ); } } } @@ -250,6 +262,34 @@ class ThingDiscovery extends Stream ); } + Stream _performMqttDiscovery( + Uri brokerUri, + String discoveryTopic, + String expectedContentType, + Duration discoveryTimeout, + ) async* { + final uriScheme = brokerUri.scheme; + final client = _clientForUriScheme(uriScheme); + + if (client is! MqttDiscoverer) { + yield* Stream.error( + DiscoveryException( + "Client for URI scheme $uriScheme does not support MQTT Discovery.", + ), + ); + return; + } + + final contentStream = client.performMqttDiscovery( + brokerUri, + discoveryTopic: discoveryTopic, + expectedContentType: expectedContentType, + discoveryTimeout: discoveryTimeout, + ); + + yield* _transformContentStreamToThingDescriptions(contentStream); + } + @override Future stop() async { final stopFutures = _clients.values.map((client) => client.stop()); diff --git a/lib/src/core/protocol_interfaces/protocol_discoverer.dart b/lib/src/core/protocol_interfaces/protocol_discoverer.dart index 770adefd..cc4dc787 100644 --- a/lib/src/core/protocol_interfaces/protocol_discoverer.dart +++ b/lib/src/core/protocol_interfaces/protocol_discoverer.dart @@ -45,3 +45,22 @@ base mixin CoreLinkFormatDiscoverer on ProtocolClient { @experimental Stream discoverWithCoreLinkFormat(Uri uri); } + +/// Interface for performing experimental discovery using the MQTT protocol. +@experimental +base mixin MqttDiscoverer on ProtocolClient { + /// Performs discovery of Thing Descriptions using the MQTT protocol via the + /// given [brokerUri]. + /// + /// By default, the [discoveryTopic] `wot/td/#` will be used as discussed in + /// [this issue]. + /// + /// [this issue]: https://github.com/w3c/wot-discovery/issues/134 + @experimental + Stream performMqttDiscovery( + Uri brokerUri, { + required String discoveryTopic, + required String expectedContentType, + required Duration discoveryTimeout, + }); +}