diff --git a/lib/src/binding_coap/coap_subscription.dart b/lib/src/binding_coap/coap_subscription.dart index 9a63c012..6a6be784 100644 --- a/lib/src/binding_coap/coap_subscription.dart +++ b/lib/src/binding_coap/coap_subscription.dart @@ -8,15 +8,16 @@ import "package:coap/coap.dart"; import "../../core.dart"; -/// [Subscription] to a CoAP resource, based on the observe option ([RFC 7641]). +/// [ProtocolSubscription] to a CoAP resource, based on the observe option +/// ([RFC 7641]). /// /// [RFC 7641]: https://datatracker.ietf.org/doc/html/rfc7641 -class CoapSubscription implements Subscription { +final class CoapSubscription extends ProtocolSubscription { /// Constructor CoapSubscription( this._coapClient, this._observeClientRelation, - this._complete, + super._complete, ) : _active = true; final CoapClient _coapClient; @@ -28,10 +29,6 @@ class CoapSubscription implements Subscription { @override bool get active => _active; - /// Callback used to pass by the servient that is used to signal it that an - /// observation has been cancelled. - final void Function() _complete; - @override Future stop({ int? formIndex, @@ -48,6 +45,7 @@ class CoapSubscription implements Subscription { await _coapClient.cancelObserveProactive(observeClientRelation); } _coapClient.close(); - _complete(); + await super + .stop(formIndex: formIndex, uriVariables: uriVariables, data: data); } } diff --git a/lib/src/binding_mqtt/mqtt_subscription.dart b/lib/src/binding_mqtt/mqtt_subscription.dart index 5588c13e..e3f8fa55 100644 --- a/lib/src/binding_mqtt/mqtt_subscription.dart +++ b/lib/src/binding_mqtt/mqtt_subscription.dart @@ -9,13 +9,13 @@ import "package:mqtt_client/mqtt_server_client.dart"; import "../../core.dart"; -/// [Subscription] for the MQTT protocol. -class MqttSubscription implements Subscription { +/// [ProtocolSubscription] for the MQTT protocol. +final class MqttSubscription extends ProtocolSubscription { /// Constructor. MqttSubscription( this._form, this._client, - this._complete, { + super._complete, { required void Function(Content content) next, void Function(Exception error)? error, }) : _active = true { @@ -52,8 +52,6 @@ class MqttSubscription implements Subscription { bool _active = true; - final void Function() _complete; - @override bool get active => _active; @@ -65,6 +63,7 @@ class MqttSubscription implements Subscription { }) async { _client.disconnect(); _active = false; - _complete(); + await super + .stop(formIndex: formIndex, uriVariables: uriVariables, data: data); } } diff --git a/lib/src/core/implementation/consumed_thing.dart b/lib/src/core/implementation/consumed_thing.dart index f52f1c5a..4e72fa81 100644 --- a/lib/src/core/implementation/consumed_thing.dart +++ b/lib/src/core/implementation/consumed_thing.dart @@ -29,9 +29,9 @@ class ConsumedThing implements scripting_api.ConsumedThing { /// The [title] of the Thing. final String title; - final Map _subscribedEvents = {}; + final Set _subscribedEvents = {}; - final Map _observedProperties = {}; + final Set _observedProperties = {}; /// Determines the id of this [ConsumedThing]. String get identifier => thingDescription.identifier; @@ -229,7 +229,7 @@ class ConsumedThing implements scripting_api.ConsumedThing { ); } - if (_observedProperties.containsKey(propertyName)) { + if (_observedProperties.contains(propertyName)) { throw StateError( "ConsumedThing '$title' already has a function " "subscribed to $propertyName. You can only observe once", @@ -259,15 +259,12 @@ class ConsumedThing implements scripting_api.ConsumedThing { required Map? uriVariables, }) async { final OperationType operationType; - final Map subscriptions; switch (subscriptionType) { case scripting_api.SubscriptionType.property: operationType = OperationType.observeproperty; - subscriptions = _observedProperties; case scripting_api.SubscriptionType.event: operationType = OperationType.subscribeevent; - subscriptions = _subscribedEvents; } final (client, form) = _getClientFor( @@ -288,17 +285,10 @@ class ConsumedThing implements scripting_api.ConsumedThing { onError(error); } }, - complete: () => removeSubscription(affordanceName, subscriptionType), + complete: () => _removeSubscription(affordanceName, subscriptionType), ); - switch (subscriptionType) { - case scripting_api.SubscriptionType.property: - _observedProperties[affordanceName] = subscription; - case scripting_api.SubscriptionType.event: - _subscribedEvents[affordanceName] = subscription; - } - - subscriptions[affordanceName] = subscription; + _addSubscription(affordanceName, subscriptionType); return subscription; } @@ -376,7 +366,7 @@ class ConsumedThing implements scripting_api.ConsumedThing { ); } - if (_subscribedEvents.containsKey(eventName)) { + if (_subscribedEvents.contains(eventName)) { throw DartWotException( "ConsumedThing '$title' already has a function " "subscribed to $eventName. You can only subscribe once.", @@ -407,31 +397,25 @@ class ConsumedThing implements scripting_api.ConsumedThing { ); } - /// Removes a subscription with a specified [key] and [type]. - void removeSubscription(String key, scripting_api.SubscriptionType type) { + void _addSubscription( + String key, + scripting_api.SubscriptionType type, + ) { switch (type) { case scripting_api.SubscriptionType.property: - _observedProperties.remove(key); + _observedProperties.add(key); case scripting_api.SubscriptionType.event: - _subscribedEvents.remove(key); + _subscribedEvents.add(key); } } - /// Cleans up the resources used by this [ConsumedThing]. - bool destroy({bool external = true}) { - for (final observedProperty in _observedProperties.values) { - observedProperty.stop(); - } - _observedProperties.clear(); - for (final subscribedEvent in _subscribedEvents.values) { - subscribedEvent.stop(); - } - _subscribedEvents.clear(); - - if (external) { - return servient.deregisterConsumedThing(this); + /// Removes a subscription with a specified [key] and [type]. + void _removeSubscription(String key, scripting_api.SubscriptionType type) { + switch (type) { + case scripting_api.SubscriptionType.property: + _observedProperties.remove(key); + case scripting_api.SubscriptionType.event: + _subscribedEvents.remove(key); } - - return false; } } diff --git a/lib/src/core/implementation/servient.dart b/lib/src/core/implementation/servient.dart index aab1bfca..ae7f6b4d 100644 --- a/lib/src/core/implementation/servient.dart +++ b/lib/src/core/implementation/servient.dart @@ -97,7 +97,6 @@ class InternalServient implements Servient { final List _servers = []; final Map _clientFactories = {}; final Map _things = {}; - final Set _consumedThings = {}; final ServerSecurityCallback? _serverSecurityCallback; @@ -127,10 +126,6 @@ class InternalServient implements Servient { clientFactory.destroy(); } _clientFactories.clear(); - for (final consumedThing in _consumedThings) { - consumedThing.destroy(); - } - _consumedThings.clear(); final serverStatuses = _servers.map((server) => server.stop()).toList(); await Future.wait(serverStatuses); @@ -177,29 +172,6 @@ class InternalServient implements Servient { return true; } - /// Removes and cleans up the resources of a [ConsumedThing]. - /// - /// If the [ConsumedThing] has not been registered before, `false` is - /// returned, otherwise `true`. - bool destroyConsumedThing(ConsumedThing consumedThing) { - return consumedThing.destroy(external: false); - } - - /// De-registers the given [consumedThing]. - /// - /// If the [ConsumedThing] has not been registered before, `false` is - /// returned, otherwise `true`. - bool deregisterConsumedThing(ConsumedThing consumedThing) { - return _consumedThings.remove(consumedThing); - } - - /// Adds a [ConsumedThing] to the servient if it hasn't been registered - /// before. - /// - /// Returns `false` if the [thing] has already been registered, otherwise - /// `true`. - bool addConsumedThing(ConsumedThing thing) => _consumedThings.add(thing); - /// Returns an [ExposedThing] with the given [id] if it has been registered. ExposedThing? thing(String id) => _things[id]; @@ -270,12 +242,8 @@ class InternalServient implements Servient { /// Consumes a [ThingDescription] and returns a [scripting_api.ConsumedThing]. Future consume( ThingDescription thingDescription, - ) async { - final newThing = ConsumedThing(this, thingDescription); - addConsumedThing(newThing); - - return newThing; - } + ) async => + ConsumedThing(this, thingDescription); /// Exposes a Thing based on an [scripting_api.ExposedThingInit]. Future produce( diff --git a/lib/src/core/protocol_interfaces.dart b/lib/src/core/protocol_interfaces.dart index a7b7fa00..652d757c 100644 --- a/lib/src/core/protocol_interfaces.dart +++ b/lib/src/core/protocol_interfaces.dart @@ -7,3 +7,4 @@ export "protocol_interfaces/protocol_client.dart"; export "protocol_interfaces/protocol_client_factory.dart"; export "protocol_interfaces/protocol_server.dart"; +export "protocol_interfaces/protocol_subscription.dart"; diff --git a/lib/src/core/protocol_interfaces/protocol_subscription.dart b/lib/src/core/protocol_interfaces/protocol_subscription.dart new file mode 100644 index 00000000..8230b1d3 --- /dev/null +++ b/lib/src/core/protocol_interfaces/protocol_subscription.dart @@ -0,0 +1,30 @@ +// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// SPDX-License-Identifier: BSD-3-Clause + +import "package:meta/meta.dart"; + +import "../scripting_api.dart"; + +/// Base class for implementations of the [Subscription] interface. +abstract base class ProtocolSubscription implements Subscription { + /// Instantiates a new [ProtocolSubscription]. + /// + /// The [_complete] callback will be called when the [ProtocolSubscription] + /// has been [stop]ped (either internally or externally). + ProtocolSubscription(this._complete); + + final void Function() _complete; + + @override + @mustCallSuper + Future stop({ + int? formIndex, + Map? uriVariables, + Object? data, + }) async { + _complete(); + } +}