Skip to content

Commit

Permalink
Merge pull request #167 from eclipse-thingweb/discovery-interfaces
Browse files Browse the repository at this point in the history
feat!: rework discoverer APIs
  • Loading branch information
JKRhb committed May 29, 2024
2 parents 669333a + 77e39d6 commit 5fd0f1c
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 202 deletions.
2 changes: 2 additions & 0 deletions lib/core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
/// runtime used for consuming, exposing, and discovering Things.
library core;

// TODO(JKRhb): Reorganize top-level core package into smaller packages.
export "src/core/definitions.dart";
export "src/core/exceptions.dart";
export "src/core/extensions.dart";
export "src/core/implementation.dart";
export "src/core/protocol_interfaces.dart";
export "src/core/scripting_api.dart";
94 changes: 33 additions & 61 deletions lib/src/binding_coap/coap_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ coap.PskCredentialsCallback? _createPskCallback(
}

/// A [ProtocolClient] for the Constrained Application Protocol (CoAP).
final class CoapClient extends ProtocolClient {
final class CoapClient extends ProtocolClient
with DirectDiscoverer, MulticastDiscoverer, CoreLinkFormatDiscoverer {
/// Creates a new [CoapClient] based on an optional [CoapConfig].
CoapClient({
CoapConfig? coapConfig,
Expand Down Expand Up @@ -446,67 +447,22 @@ final class CoapClient extends ProtocolClient {
@override
Future<void> stop() async {}

Stream<DiscoveryContent> _discoverFromMulticast(
coap.CoapClient client,
Uri uri,
) async* {
final streamController = StreamController<DiscoveryContent>();
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
await streamController.close();
},
);

final content = _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
multicastResponseHandler: multicastResponseHandler,
);
unawaited(content);
yield* streamController.stream;
}

Stream<DiscoveryContent> _discoverFromUnicast(
coap.CoapClient client,
Uri uri,
) async* {
yield await _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);
}

@override
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
final client = coap.CoapClient(uri);

if (uri.isMulticastAddress) {
if (!disableMulticast) {
yield* _discoverFromMulticast(client, uri);
}
} else {
yield* _discoverFromUnicast(client, uri);
}
}
Future<DiscoveryContent> discoverDirectly(Uri uri) async =>
_sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);

@override
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) async* {
coap.CoapMulticastResponseHandler? multicastResponseHandler;
final streamController = StreamController<DiscoveryContent>();

// TODO: Replace once https://github.com/shamblett/coap/pull/129 is merged
if (uri.isMulticastAddress) {
if (uri.hasMulticastAddress) {
multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.determineDiscoveryContent(uri.scheme));
Expand All @@ -526,18 +482,34 @@ final class CoapClient extends ProtocolClient {
multicastResponseHandler: multicastResponseHandler,
);

if (uri.isMulticastAddress) {
if (uri.hasMulticastAddress) {
yield* streamController.stream;
} else {
yield content;
}
}

@override
Future<Content> requestThingDescription(Uri url) async => _sendRequest(
url,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);
Stream<Content> discoverViaMulticast(Uri uri) async* {
final streamController = StreamController<DiscoveryContent>();
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
await streamController.close();
},
);

final content = _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
multicastResponseHandler: multicastResponseHandler,
);
unawaited(content);
yield* streamController.stream;
}
}
10 changes: 0 additions & 10 deletions lib/src/binding_coap/coap_extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//
// SPDX-License-Identifier: BSD-3-Clause

import "dart:io";
import "dart:typed_data";

import "package:cbor/cbor.dart";
Expand All @@ -16,15 +15,6 @@ import "../../core.dart" hide PskCredentials;
import "coap_binding_exception.dart";
import "coap_definitions.dart";

/// Extension which makes it easier to handle [Uri]s containing
/// [InternetAddress]es.
extension InternetAddressMethods on Uri {
/// Checks whether the host of this [Uri] is a multicast [InternetAddress].
bool get isMulticastAddress {
return InternetAddress.tryParse(host)?.isMulticast ?? false;
}
}

/// CoAP-specific extensions for the [AugmentedForm] class.
extension CoapFormExtension on AugmentedForm {
T? _obtainVocabularyTerm<T>(String vocabularyTerm) {
Expand Down
23 changes: 5 additions & 18 deletions lib/src/binding_http/http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const _authorizationHeader = "Authorization";
/// [RFC 7616]: https://datatracker.ietf.org/doc/html/rfc7616
/// [RFC 6750]: https://datatracker.ietf.org/doc/html/rfc6750
/// [`ComboSecurityScheme`]: https://w3c.github.io/wot-thing-description/#combosecurityscheme
final class HttpClient extends ProtocolClient {
final class HttpClient extends ProtocolClient
with DirectDiscoverer, CoreLinkFormatDiscoverer {
/// Creates a new [HttpClient].
HttpClient({
AsyncClientSecurityCallback<BasicCredentials>? basicCredentialsCallback,
Expand Down Expand Up @@ -304,13 +305,13 @@ final class HttpClient extends ProtocolClient {
}

@override
Stream<DiscoveryContent> discoverDirectly(
Future<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
}) async {
final request = Request(HttpRequestMethod.get.methodName, uri);

yield await _sendDiscoveryRequest(
return _sendDiscoveryRequest(
request,
acceptHeaderValue: "application/td+json",
);
Expand All @@ -327,18 +328,4 @@ final class HttpClient extends ProtocolClient {

yield encodedLinks;
}

@override
Future<Content> requestThingDescription(Uri url) async {
final request = Request(HttpRequestMethod.get.methodName, url);
const tdContentType = "application/td+json";
request.headers["Accept"] = tdContentType;

final response = await _client.send(request);

return Content(
response.headers["Content-Type"] ?? tdContentType,
response.stream,
);
}
}
10 changes: 2 additions & 8 deletions lib/src/binding_mqtt/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const mqttContextUri = "http://www.example.org/mqtt-binding#";
/// The default prefix used in MQTT-related compact URIs (CURIEs) in TDs.
const defaultMqttPrefix = "mqv";

/// Default timeout length used for reading properties and discovering TDs.
const defaultTimeout = Duration(seconds: 10);
/// Default timeout length used for reading properties.
const defaultReadTimeout = Duration(seconds: 10);

/// Default duration MQTT connections are kept alive in seconds.
const defaultKeepAlivePeriod = 20;
Expand All @@ -43,9 +43,3 @@ const defaultKeepAlivePeriod = 20;
///
/// Evaluates to `'application/octet-stream'.
const defaultContentType = "application/octet-stream";

/// Content type used for the Content objects returned by discovery using MQTT.
///
/// Evaluates to `application/td+json`.
// TODO: Should probably be redefined globally
const discoveryContentType = "application/td+json";
70 changes: 27 additions & 43 deletions lib/src/binding_mqtt/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import "mqtt_subscription.dart";
/// [ProtocolClient] for supporting the MQTT protocol.
///
/// Currently, only MQTT version 3.1.1 is supported.
final class MqttClient extends ProtocolClient {
final class MqttClient extends ProtocolClient with MqttDiscoverer {
/// Constructor.
MqttClient({
MqttConfig? mqttConfig,
Expand Down Expand Up @@ -199,22 +199,13 @@ final class MqttClient extends ProtocolClient {
}

@override
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
Stream<Content> performMqttDiscovery(
Uri brokerUri, {
required String discoveryTopic,
required String expectedContentType,
required Duration discoveryTimeout,
}) async* {
final client = await _connect(uri, null);
const discoveryTopic = "wot/td/#";

final streamController = StreamController<DiscoveryContent>();

Timer(
_mqttConfig.discoveryTimeout,
() async {
client.disconnect();
await streamController.close();
},
);
final client = await _connect(brokerUri, null);

// TODO: Revisit QoS value and subscription check
if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) {
Expand All @@ -223,36 +214,29 @@ final class MqttClient extends ProtocolClient {
);
}

client.updates?.listen(
(messages) {
for (final message in messages) {
final publishedMessage = message.payload as MqttPublishMessage;
final payload = publishedMessage.payload.message;

streamController.add(
DiscoveryContent(
discoveryContentType,
Stream.value(payload),
uri,
),
);
}
final receivedMessageStream = client.updates;
if (receivedMessageStream == null) {
throw MqttBindingException(
"Subscription to topic $discoveryTopic failed",
);
}

Timer(
discoveryTimeout,
() async {
client.disconnect();
},
cancelOnError: false,
);

yield* streamController.stream;
}

@override
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) {
// TODO: implement discoverWithCoreLinkFormat
throw UnimplementedError();
}
await for (final receivedMessageList in receivedMessageStream) {
for (final receivedMessage in receivedMessageList) {
final mqttMessage = receivedMessage.payload;
if (mqttMessage is MqttPublishMessage) {
final messagePayload = mqttMessage.payload.message;

@override
Future<Content> requestThingDescription(Uri url) {
// TODO: implement requestThingDescription
throw UnimplementedError();
yield Content(expectedContentType, Stream.value(messagePayload));
}
}
}
}
}
15 changes: 5 additions & 10 deletions lib/src/binding_mqtt/mqtt_config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ import "constants.dart";
/// The default [QoS] values for the different operation types will be used if
/// no Quality of Service is defined in the respective form.
///
/// If no [readTimeout] or [discoveryTimeout] is defined, a [defaultTimeout] of
/// 10 seconds will be used. Furthermore, the [keepAlivePeriod] defaults to 20
/// seconds.
/// If no [readTimeout] is defined, a [defaultReadTimeout] of
/// 10 seconds will be used.
/// Furthermore, the [keepAlivePeriod] defaults to a [defaultKeepAlivePeriod] of
/// 20 seconds.
class MqttConfig {
/// Creates a new [MqttConfig] object.
MqttConfig({
this.defaultReadQoS = QoS.atMostOnce,
this.defaultWriteQoS = QoS.atMostOnce,
this.defaultActionQoS = QoS.atMostOnce,
this.defaultSubscribeQoS = QoS.atLeastOnce,
this.readTimeout = defaultTimeout,
this.discoveryTimeout = defaultTimeout,
this.readTimeout = defaultReadTimeout,
this.keepAlivePeriod = defaultKeepAlivePeriod,
});

Expand All @@ -50,11 +50,6 @@ class MqttConfig {
/// If no value has been read until the timeout has expired, the operation
/// will be canceled.
final Duration readTimeout;

/// Timeout value used for discovery using MQTT.
///
/// The discovery process will be aborted once the timeout has expired.
final Duration discoveryTimeout;
}

/// Enum for indicating the default Quality of Service (QoS) that should be used
Expand Down
10 changes: 10 additions & 0 deletions lib/src/core/extensions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// SPDX-License-Identifier: BSD-3-Clause

/// Sub-library for extensions used by `dart_wot`.
library extensions;

export "extensions/uri_extensions.dart";
16 changes: 16 additions & 0 deletions lib/src/core/extensions/uri_extensions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// SPDX-License-Identifier: BSD-3-Clause

import "dart:io";

/// Extension that makes it easier to handle [Uri]s which potentially contain
/// [InternetAddress]es.
extension InternetAddressMethodExtension on Uri {
/// Checks whether the host of this [Uri] is a multicast [InternetAddress].
bool get hasMulticastAddress {
return InternetAddress.tryParse(host)?.isMulticast ?? false;
}
}
Loading

0 comments on commit 5fd0f1c

Please sign in to comment.