Skip to content

Commit

Permalink
feat!: rework discoverer APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed May 29, 2024
1 parent 669333a commit 1f0239b
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 174 deletions.
90 changes: 31 additions & 59 deletions lib/src/binding_coap/coap_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ coap.PskCredentialsCallback? _createPskCallback(
}

/// A [ProtocolClient] for the Constrained Application Protocol (CoAP).
final class CoapClient extends ProtocolClient {
final class CoapClient extends ProtocolClient
with DirectDiscoverer, MulticastDiscoverer, CoreLinkFormatDiscoverer {
/// Creates a new [CoapClient] based on an optional [CoapConfig].
CoapClient({
CoapConfig? coapConfig,
Expand Down Expand Up @@ -446,59 +447,14 @@ final class CoapClient extends ProtocolClient {
@override
Future<void> stop() async {}

Stream<DiscoveryContent> _discoverFromMulticast(
coap.CoapClient client,
Uri uri,
) async* {
final streamController = StreamController<DiscoveryContent>();
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
await streamController.close();
},
);

final content = _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
multicastResponseHandler: multicastResponseHandler,
);
unawaited(content);
yield* streamController.stream;
}

Stream<DiscoveryContent> _discoverFromUnicast(
coap.CoapClient client,
Uri uri,
) async* {
yield await _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);
}

@override
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
final client = coap.CoapClient(uri);

if (uri.isMulticastAddress) {
if (!disableMulticast) {
yield* _discoverFromMulticast(client, uri);
}
} else {
yield* _discoverFromUnicast(client, uri);
}
}
Future<DiscoveryContent> discoverDirectly(Uri uri) async =>
_sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);

@override
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) async* {
Expand Down Expand Up @@ -534,10 +490,26 @@ final class CoapClient extends ProtocolClient {
}

@override
Future<Content> requestThingDescription(Uri url) async => _sendRequest(
url,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
);
Stream<Content> discoverViaMulticast(Uri uri) async* {
final streamController = StreamController<DiscoveryContent>();
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
await streamController.close();
},
);

final content = _sendDiscoveryRequest(
uri,
coap.RequestMethod.get,
form: null,
accept: coap.CoapMediaType.applicationTdJson,
multicastResponseHandler: multicastResponseHandler,
);
unawaited(content);
yield* streamController.stream;
}
}
23 changes: 5 additions & 18 deletions lib/src/binding_http/http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const _authorizationHeader = "Authorization";
/// [RFC 7616]: https://datatracker.ietf.org/doc/html/rfc7616
/// [RFC 6750]: https://datatracker.ietf.org/doc/html/rfc6750
/// [`ComboSecurityScheme`]: https://w3c.github.io/wot-thing-description/#combosecurityscheme
final class HttpClient extends ProtocolClient {
final class HttpClient extends ProtocolClient
with DirectDiscoverer, CoreLinkFormatDiscoverer {
/// Creates a new [HttpClient].
HttpClient({
AsyncClientSecurityCallback<BasicCredentials>? basicCredentialsCallback,
Expand Down Expand Up @@ -304,13 +305,13 @@ final class HttpClient extends ProtocolClient {
}

@override
Stream<DiscoveryContent> discoverDirectly(
Future<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
}) async {
final request = Request(HttpRequestMethod.get.methodName, uri);

yield await _sendDiscoveryRequest(
return _sendDiscoveryRequest(
request,
acceptHeaderValue: "application/td+json",
);
Expand All @@ -327,18 +328,4 @@ final class HttpClient extends ProtocolClient {

yield encodedLinks;
}

@override
Future<Content> requestThingDescription(Uri url) async {
final request = Request(HttpRequestMethod.get.methodName, url);
const tdContentType = "application/td+json";
request.headers["Accept"] = tdContentType;

final response = await _client.send(request);

return Content(
response.headers["Content-Type"] ?? tdContentType,
response.stream,
);
}
}
6 changes: 0 additions & 6 deletions lib/src/binding_mqtt/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,3 @@ const defaultKeepAlivePeriod = 20;
///
/// Evaluates to `'application/octet-stream'.
const defaultContentType = "application/octet-stream";

/// Content type used for the Content objects returned by discovery using MQTT.
///
/// Evaluates to `application/td+json`.
// TODO: Should probably be redefined globally
const discoveryContentType = "application/td+json";
69 changes: 26 additions & 43 deletions lib/src/binding_mqtt/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import "mqtt_subscription.dart";
/// [ProtocolClient] for supporting the MQTT protocol.
///
/// Currently, only MQTT version 3.1.1 is supported.
final class MqttClient extends ProtocolClient {
final class MqttClient extends ProtocolClient with MqttDiscoverer {
/// Constructor.
MqttClient({
MqttConfig? mqttConfig,
Expand Down Expand Up @@ -199,22 +199,12 @@ final class MqttClient extends ProtocolClient {
}

@override
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
Stream<Content> performMqttDiscovery(
Uri brokerUri, {
required String discoveryTopic,
required String expectedContentType,
}) async* {
final client = await _connect(uri, null);
const discoveryTopic = "wot/td/#";

final streamController = StreamController<DiscoveryContent>();

Timer(
_mqttConfig.discoveryTimeout,
() async {
client.disconnect();
await streamController.close();
},
);
final client = await _connect(brokerUri, null);

// TODO: Revisit QoS value and subscription check
if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) {
Expand All @@ -223,36 +213,29 @@ final class MqttClient extends ProtocolClient {
);
}

client.updates?.listen(
(messages) {
for (final message in messages) {
final publishedMessage = message.payload as MqttPublishMessage;
final payload = publishedMessage.payload.message;

streamController.add(
DiscoveryContent(
discoveryContentType,
Stream.value(payload),
uri,
),
);
}
final receivedMessageStream = client.updates;
if (receivedMessageStream == null) {
throw MqttBindingException(
"Subscription to topic $discoveryTopic failed",
);
}

Timer(
_mqttConfig.discoveryTimeout,
() async {
client.disconnect();
},
cancelOnError: false,
);

yield* streamController.stream;
}
await for (final receivedMessageList in receivedMessageStream) {
for (final receivedMessage in receivedMessageList) {
final mqttMessage = receivedMessage.payload;
if (mqttMessage is MqttPublishMessage) {
final messagePayload = mqttMessage.payload.message;

@override
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) {
// TODO: implement discoverWithCoreLinkFormat
throw UnimplementedError();
}

@override
Future<Content> requestThingDescription(Uri url) {
// TODO: implement requestThingDescription
throw UnimplementedError();
yield Content(expectedContentType, Stream.value(messagePayload));
}
}
}
}
}
29 changes: 29 additions & 0 deletions lib/src/core/implementation/discovery/discovery_configuration.dart
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,35 @@ final class ExploreDirectoryConfiguration extends DiscoveryConfiguration {
final int? limit;
}

/// Experimental [DiscoveryConfiguration] that is used to perform discovery with
/// the MQTT protocol.
@experimental
final class MqttDiscoveryConfiguration extends DiscoveryConfiguration {
/// Instantiates a new [DiscoveryConfiguration] for MQTT.
const MqttDiscoveryConfiguration(
this.brokerUri, {
this.discoveryTopic = "wot/td/#",
this.expectedContentType = "application/td+json",
});

/// [Uri] of the broker the
final Uri brokerUri;

/// The topic that will be used for performing the discovery process.
///
/// Defaults to `wot/td/#`.
final String discoveryTopic;

/// The content type that is expected by to be returned during the discovery
/// process.
///
/// Data that is received during the discovery process that is not
/// deserializable using the content type provided here will be ignored.
///
/// Defaults to `application/td+json`.
final String expectedContentType;
}

/// Base class for configuring discovery mechanisms that involve a two-step
/// approach.
///
Expand Down
9 changes: 8 additions & 1 deletion lib/src/core/implementation/servient.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import "../protocol_interfaces.dart";
import "../scripting_api.dart" as scripting_api;

import "consumed_thing.dart";
import "content.dart";
import "content_serdes.dart";
import "discovery/discovery_configuration.dart";
import "exposed_thing.dart";
Expand Down Expand Up @@ -281,7 +282,13 @@ class InternalServient implements Servient {
/// Requests a [ThingDescription] from a [url].
Future<ThingDescription> requestThingDescription(Uri url) async {
final client = clientFor(url.scheme);
final content = await client.requestThingDescription(url);
final Content content;

if (client is! DirectDiscoverer) {
throw Exception();
}

content = await client.discoverDirectly(url);

final dataSchemaValue = await contentSerdes.contentToValue(content, null);

Expand Down
44 changes: 44 additions & 0 deletions lib/src/core/implementation/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ class ThingDiscovery extends Stream<ThingDescription>
thingFilter: thingFilter,
);
yield* thingDiscoveryProcess;
case MqttDiscoveryConfiguration(
:final brokerUri,
:final discoveryTopic,
:final expectedContentType,
):
yield* _performMqttDiscovery(
brokerUri,
discoveryTopic,
expectedContentType,
);
}
}
}
Expand Down Expand Up @@ -192,6 +202,11 @@ class ThingDiscovery extends Stream<ThingDescription>
) async* {
final client = _clientForUriScheme(uri.scheme);

if (client is! CoreLinkFormatDiscoverer) {
yield* Stream.error(Exception());
return;
}

await for (final coreWebLink in client.discoverWithCoreLinkFormat(uri)) {
try {
final parsedUris = await _filterCoreWebLinks(resourceType, coreWebLink);
Expand Down Expand Up @@ -234,6 +249,35 @@ class ThingDiscovery extends Stream<ThingDescription>
);
}

Stream<ThingDescription> _performMqttDiscovery(
Uri brokerUri,
String discoveryTopic,
String expectedContentType,
) async* {
final client = _clientForUriScheme(brokerUri.scheme);

if (client is MqttDiscoverer) {
final contentStream = client.performMqttDiscovery(
brokerUri,
discoveryTopic: discoveryTopic,
expectedContentType: expectedContentType,
);

await for (final content in contentStream) {
try {
final dataSchemaValue =
await _servient.contentSerdes.contentToValue(content, null);

if (dataSchemaValue is scripting_api.ObjectValue) {
yield dataSchemaValue.value.toThingDescription();
}
} on Exception {
continue;
}
}
}
}

@override
Future<void> stop() async {
final stopFutures = _clients.values.map((client) => client.stop());
Expand Down
1 change: 1 addition & 0 deletions lib/src/core/protocol_interfaces.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@

export "protocol_interfaces/protocol_client.dart";
export "protocol_interfaces/protocol_client_factory.dart";
export "protocol_interfaces/protocol_discoverer.dart";
export "protocol_interfaces/protocol_server.dart";
export "protocol_interfaces/protocol_subscription.dart";
Loading

0 comments on commit 1f0239b

Please sign in to comment.