Skip to content

Commit

Permalink
feat: introduce new MQTT discovery method
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed May 29, 2024
1 parent 44bb791 commit fc3cc5c
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 1 deletion.
44 changes: 43 additions & 1 deletion lib/src/binding_mqtt/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import "mqtt_subscription.dart";
/// [ProtocolClient] for supporting the MQTT protocol.
///
/// Currently, only MQTT version 3.1.1 is supported.
final class MqttClient extends ProtocolClient {
final class MqttClient extends ProtocolClient with MqttDiscoverer {
/// Constructor.
MqttClient({
MqttConfig? mqttConfig,
Expand Down Expand Up @@ -197,4 +197,46 @@ final class MqttClient extends ProtocolClient {

return MqttSubscription(form, client, complete, next: next, error: error);
}

@override
Stream<Content> performMqttDiscovery(
Uri brokerUri, {
required String discoveryTopic,
required String expectedContentType,
required Duration discoveryTimeout,
}) async* {
final client = await _connect(brokerUri, null);

// TODO: Revisit QoS value and subscription check
if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) {
throw MqttBindingException(
"Subscription to topic $discoveryTopic failed",
);
}

final receivedMessageStream = client.updates;
if (receivedMessageStream == null) {
throw MqttBindingException(
"Subscription to topic $discoveryTopic failed",
);
}

Timer(
discoveryTimeout,
() async {
client.disconnect();
},
);

await for (final receivedMessageList in receivedMessageStream) {
for (final receivedMessage in receivedMessageList) {
final mqttMessage = receivedMessage.payload;
if (mqttMessage is MqttPublishMessage) {
final messagePayload = mqttMessage.payload.message;

yield Content(expectedContentType, Stream.value(messagePayload));
}
}
}
}
}
39 changes: 39 additions & 0 deletions lib/src/core/implementation/discovery/discovery_configuration.dart
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,45 @@ final class ExploreDirectoryConfiguration extends DiscoveryConfiguration {
final int? limit;
}

/// Experimental [DiscoveryConfiguration] that is used to perform discovery with
/// the MQTT protocol.
@experimental
final class MqttDiscoveryConfiguration extends DiscoveryConfiguration {
/// Instantiates a new [DiscoveryConfiguration] for MQTT.
const MqttDiscoveryConfiguration(
this.brokerUri, {
this.discoveryTopic = "wot/td/#",
this.expectedContentType = "application/td+json",
this.discoveryTimeout = const Duration(seconds: 5),
});

/// [Uri] of the broker the
final Uri brokerUri;

/// The topic that will be used for performing the discovery process.
///
/// If a wildcard topic is used, then the discovery process may return more
/// than one TD.
///
/// Defaults to `wot/td/#`.
final String discoveryTopic;

/// The Thing Description content type that is expected during the discovery
/// process.
///
/// Data that is received during the discovery process that is not
/// deserializable using the content type provided here will be ignored.
///
/// Defaults to `application/td+json`.
final String expectedContentType;

/// Time period after which the MQTT discovery process is going to be
/// cancelled.
///
/// Defaults to five seconds.
final Duration discoveryTimeout;
}

/// Base class for configuring discovery mechanisms that involve a two-step
/// approach.
///
Expand Down
40 changes: 40 additions & 0 deletions lib/src/core/implementation/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ class ThingDiscovery extends Stream<ThingDescription>
thingFilter: thingFilter,
);
yield* thingDiscoveryProcess;
case MqttDiscoveryConfiguration(
:final brokerUri,
:final discoveryTopic,
:final expectedContentType,
:final discoveryTimeout,
):
yield* _performMqttDiscovery(
brokerUri,
discoveryTopic,
expectedContentType,
discoveryTimeout,
);
}
}
}
Expand Down Expand Up @@ -250,6 +262,34 @@ class ThingDiscovery extends Stream<ThingDescription>
);
}

Stream<ThingDescription> _performMqttDiscovery(
Uri brokerUri,
String discoveryTopic,
String expectedContentType,
Duration discoveryTimeout,
) async* {
final uriScheme = brokerUri.scheme;
final client = _clientForUriScheme(uriScheme);

if (client is! MqttDiscoverer) {
yield* Stream.error(
DiscoveryException(
"Client for URI scheme $uriScheme does not support MQTT Discovery.",
),
);
return;
}

final contentStream = client.performMqttDiscovery(
brokerUri,
discoveryTopic: discoveryTopic,
expectedContentType: expectedContentType,
discoveryTimeout: discoveryTimeout,
);

yield* _transformContentStreamToThingDescriptions(contentStream);
}

@override
Future<void> stop() async {
final stopFutures = _clients.values.map((client) => client.stop());
Expand Down
19 changes: 19 additions & 0 deletions lib/src/core/protocol_interfaces/protocol_discoverer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,22 @@ base mixin CoreLinkFormatDiscoverer on ProtocolClient {
@experimental
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri);
}

/// Interface for performing experimental discovery using the MQTT protocol.
@experimental
base mixin MqttDiscoverer on ProtocolClient {
/// Performs discovery of Thing Descriptions using the MQTT protocol via the
/// given [brokerUri].
///
/// By default, the [discoveryTopic] `wot/td/#` will be used as discussed in
/// [this issue].
///
/// [this issue]: https://github.com/w3c/wot-discovery/issues/134
@experimental
Stream<Content> performMqttDiscovery(
Uri brokerUri, {
required String discoveryTopic,
required String expectedContentType,
required Duration discoveryTimeout,
});
}

0 comments on commit fc3cc5c

Please sign in to comment.