Skip to content

Commit

Permalink
feat!: rework discoverer APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed May 29, 2024
1 parent b615ce6 commit bcf6984
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 186 deletions.
90 changes: 31 additions & 59 deletions lib/src/binding_coap/coap_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ coap.PskCredentialsCallback? _createPskCallback(
}

/// A [ProtocolClient] for the Constrained Application Protocol (CoAP).
final class CoapClient extends ProtocolClient {
final class CoapClient extends ProtocolClient
with DirectDiscoverer, MulticastDiscoverer, CoreLinkFormatDiscoverer {
/// Creates a new [CoapClient] based on an optional [CoapConfig].
CoapClient({
CoapConfig? coapConfig,
Expand Down Expand Up @@ -446,59 +447,14 @@ final class CoapClient extends ProtocolClient {
@override
Future<void> stop() async {}

Stream<DiscoveryContent> _discoverFromMulticast(
coap.CoapClient client,
Uri uri,
) async* {
final streamController = StreamController<DiscoveryContent>();
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
await streamController.close();
},
);

final content = _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
multicastResponseHandler: multicastResponseHandler,
);
unawaited(content);
yield* streamController.stream;
}

Stream<DiscoveryContent> _discoverFromUnicast(
coap.CoapClient client,
Uri uri,
) async* {
yield await _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);
}

@override
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
final client = coap.CoapClient(uri);

if (uri.isMulticastAddress) {
if (!disableMulticast) {
yield* _discoverFromMulticast(client, uri);
}
} else {
yield* _discoverFromUnicast(client, uri);
}
}
Future<DiscoveryContent> discoverDirectly(Uri uri) async =>
_sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);

@override
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) async* {
Expand Down Expand Up @@ -534,10 +490,26 @@ final class CoapClient extends ProtocolClient {
}

@override
Future<Content> requestThingDescription(Uri url) async => _sendRequest(
url,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);
Stream<Content> discoverViaMulticast(Uri uri) async* {
final streamController = StreamController<DiscoveryContent>();
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
await streamController.close();
},
);

final content = _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
multicastResponseHandler: multicastResponseHandler,
);
unawaited(content);
yield* streamController.stream;
}
}
23 changes: 5 additions & 18 deletions lib/src/binding_http/http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const _authorizationHeader = "Authorization";
/// [RFC 7616]: https://datatracker.ietf.org/doc/html/rfc7616
/// [RFC 6750]: https://datatracker.ietf.org/doc/html/rfc6750
/// [`ComboSecurityScheme`]: https://w3c.github.io/wot-thing-description/#combosecurityscheme
final class HttpClient extends ProtocolClient {
final class HttpClient extends ProtocolClient
with DirectDiscoverer, CoreLinkFormatDiscoverer {
/// Creates a new [HttpClient].
HttpClient({
AsyncClientSecurityCallback<BasicCredentials>? basicCredentialsCallback,
Expand Down Expand Up @@ -304,13 +305,13 @@ final class HttpClient extends ProtocolClient {
}

@override
Stream<DiscoveryContent> discoverDirectly(
Future<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
}) async {
final request = Request(HttpRequestMethod.get.methodName, uri);

yield await _sendDiscoveryRequest(
return _sendDiscoveryRequest(
request,
acceptHeaderValue: "application/td+json",
);
Expand All @@ -327,18 +328,4 @@ final class HttpClient extends ProtocolClient {

yield encodedLinks;
}

@override
Future<Content> requestThingDescription(Uri url) async {
final request = Request(HttpRequestMethod.get.methodName, url);
const tdContentType = "application/td+json";
request.headers["Accept"] = tdContentType;

final response = await _client.send(request);

return Content(
response.headers["Content-Type"] ?? tdContentType,
response.stream,
);
}
}
10 changes: 2 additions & 8 deletions lib/src/binding_mqtt/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const mqttContextUri = "http://www.example.org/mqtt-binding#";
/// The default prefix used in MQTT-related compact URIs (CURIEs) in TDs.
const defaultMqttPrefix = "mqv";

/// Default timeout length used for reading properties and discovering TDs.
const defaultTimeout = Duration(seconds: 10);
/// Default timeout length used for reading properties.
const defaultReadTimeout = Duration(seconds: 10);

/// Default duration MQTT connections are kept alive in seconds.
const defaultKeepAlivePeriod = 20;
Expand All @@ -43,9 +43,3 @@ const defaultKeepAlivePeriod = 20;
///
/// Evaluates to `'application/octet-stream'.
const defaultContentType = "application/octet-stream";

/// Content type used for the Content objects returned by discovery using MQTT.
///
/// Evaluates to `application/td+json`.
// TODO: Should probably be redefined globally
const discoveryContentType = "application/td+json";
70 changes: 27 additions & 43 deletions lib/src/binding_mqtt/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import "mqtt_subscription.dart";
/// [ProtocolClient] for supporting the MQTT protocol.
///
/// Currently, only MQTT version 3.1.1 is supported.
final class MqttClient extends ProtocolClient {
final class MqttClient extends ProtocolClient with MqttDiscoverer {
/// Constructor.
MqttClient({
MqttConfig? mqttConfig,
Expand Down Expand Up @@ -199,22 +199,13 @@ final class MqttClient extends ProtocolClient {
}

@override
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
Stream<Content> performMqttDiscovery(
Uri brokerUri, {
required String discoveryTopic,
required String expectedContentType,
required Duration discoveryTimeout,
}) async* {
final client = await _connect(uri, null);
const discoveryTopic = "wot/td/#";

final streamController = StreamController<DiscoveryContent>();

Timer(
_mqttConfig.discoveryTimeout,
() async {
client.disconnect();
await streamController.close();
},
);
final client = await _connect(brokerUri, null);

// TODO: Revisit QoS value and subscription check
if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) {
Expand All @@ -223,36 +214,29 @@ final class MqttClient extends ProtocolClient {
);
}

client.updates?.listen(
(messages) {
for (final message in messages) {
final publishedMessage = message.payload as MqttPublishMessage;
final payload = publishedMessage.payload.message;

streamController.add(
DiscoveryContent(
discoveryContentType,
Stream.value(payload),
uri,
),
);
}
final receivedMessageStream = client.updates;
if (receivedMessageStream == null) {
throw MqttBindingException(
"Subscription to topic $discoveryTopic failed",
);
}

Timer(
discoveryTimeout,
() async {
client.disconnect();
},
cancelOnError: false,
);

yield* streamController.stream;
}

@override
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) {
// TODO: implement discoverWithCoreLinkFormat
throw UnimplementedError();
}
await for (final receivedMessageList in receivedMessageStream) {
for (final receivedMessage in receivedMessageList) {
final mqttMessage = receivedMessage.payload;
if (mqttMessage is MqttPublishMessage) {
final messagePayload = mqttMessage.payload.message;

@override
Future<Content> requestThingDescription(Uri url) {
// TODO: implement requestThingDescription
throw UnimplementedError();
yield Content(expectedContentType, Stream.value(messagePayload));
}
}
}
}
}
10 changes: 2 additions & 8 deletions lib/src/binding_mqtt/mqtt_config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import "constants.dart";
/// The default [QoS] values for the different operation types will be used if
/// no Quality of Service is defined in the respective form.
///
/// If no [readTimeout] or [discoveryTimeout] is defined, a [defaultTimeout] of
/// If no [readTimeout] is defined, a default timeout of
/// 10 seconds will be used. Furthermore, the [keepAlivePeriod] defaults to 20
/// seconds.
class MqttConfig {
Expand All @@ -24,8 +24,7 @@ class MqttConfig {
this.defaultWriteQoS = QoS.atMostOnce,
this.defaultActionQoS = QoS.atMostOnce,
this.defaultSubscribeQoS = QoS.atLeastOnce,
this.readTimeout = defaultTimeout,
this.discoveryTimeout = defaultTimeout,
this.readTimeout = defaultReadTimeout,
this.keepAlivePeriod = defaultKeepAlivePeriod,
});

Expand All @@ -50,11 +49,6 @@ class MqttConfig {
/// If no value has been read until the timeout has expired, the operation
/// will be canceled.
final Duration readTimeout;

/// Timeout value used for discovery using MQTT.
///
/// The discovery process will be aborted once the timeout has expired.
final Duration discoveryTimeout;
}

/// Enum for indicating the default Quality of Service (QoS) that should be used
Expand Down
39 changes: 39 additions & 0 deletions lib/src/core/implementation/discovery/discovery_configuration.dart
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,45 @@ final class ExploreDirectoryConfiguration extends DiscoveryConfiguration {
final int? limit;
}

/// Experimental [DiscoveryConfiguration] that is used to perform discovery with
/// the MQTT protocol.
@experimental
final class MqttDiscoveryConfiguration extends DiscoveryConfiguration {
/// Instantiates a new [DiscoveryConfiguration] for MQTT.
const MqttDiscoveryConfiguration(
this.brokerUri, {
this.discoveryTopic = "wot/td/#",
this.expectedContentType = "application/td+json",
this.discoveryTimeout = const Duration(seconds: 5),
});

/// [Uri] of the broker the
final Uri brokerUri;

/// The topic that will be used for performing the discovery process.
///
/// If a wildcard topic is used, then the discovery process may return more
/// than one TD.
///
/// Defaults to `wot/td/#`.
final String discoveryTopic;

/// The Thing Description content type that is expected during the discovery
/// process.
///
/// Data that is received during the discovery process that is not
/// deserializable using the content type provided here will be ignored.
///
/// Defaults to `application/td+json`.
final String expectedContentType;

/// Time period after which the MQTT discovery process is going to be
/// cancelled.
///
/// Defaults to five seconds.
final Duration discoveryTimeout;
}

/// Base class for configuring discovery mechanisms that involve a two-step
/// approach.
///
Expand Down
9 changes: 8 additions & 1 deletion lib/src/core/implementation/servient.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import "../protocol_interfaces.dart";
import "../scripting_api.dart" as scripting_api;

import "consumed_thing.dart";
import "content.dart";
import "content_serdes.dart";
import "discovery/discovery_configuration.dart";
import "exposed_thing.dart";
Expand Down Expand Up @@ -281,7 +282,13 @@ class InternalServient implements Servient {
/// Requests a [ThingDescription] from a [url].
Future<ThingDescription> requestThingDescription(Uri url) async {
final client = clientFor(url.scheme);
final content = await client.requestThingDescription(url);
final Content content;

if (client is! DirectDiscoverer) {
throw Exception();
}

content = await client.discoverDirectly(url);

final dataSchemaValue = await contentSerdes.contentToValue(content, null);

Expand Down
Loading

0 comments on commit bcf6984

Please sign in to comment.