Skip to content

Commit

Permalink
Merge pull request #162 from eclipse-thingweb/refactor-structure
Browse files Browse the repository at this point in the history
Rework clean up of ConsumedThing
  • Loading branch information
JKRhb committed May 24, 2024
2 parents 37fbb28 + aab9e9b commit 9255bea
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 83 deletions.
14 changes: 6 additions & 8 deletions lib/src/binding_coap/coap_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import "package:coap/coap.dart";

import "../../core.dart";

/// [Subscription] to a CoAP resource, based on the observe option ([RFC 7641]).
/// [ProtocolSubscription] to a CoAP resource, based on the observe option
/// ([RFC 7641]).
///
/// [RFC 7641]: https://datatracker.ietf.org/doc/html/rfc7641
class CoapSubscription implements Subscription {
final class CoapSubscription extends ProtocolSubscription {
/// Constructor
CoapSubscription(
this._coapClient,
this._observeClientRelation,
this._complete,
super._complete,
) : _active = true;

final CoapClient _coapClient;
Expand All @@ -28,10 +29,6 @@ class CoapSubscription implements Subscription {
@override
bool get active => _active;

/// Callback used to pass by the servient that is used to signal it that an
/// observation has been cancelled.
final void Function() _complete;

@override
Future<void> stop({
int? formIndex,
Expand All @@ -48,6 +45,7 @@ class CoapSubscription implements Subscription {
await _coapClient.cancelObserveProactive(observeClientRelation);
}
_coapClient.close();
_complete();
await super
.stop(formIndex: formIndex, uriVariables: uriVariables, data: data);
}
}
11 changes: 5 additions & 6 deletions lib/src/binding_mqtt/mqtt_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import "package:mqtt_client/mqtt_server_client.dart";

import "../../core.dart";

/// [Subscription] for the MQTT protocol.
class MqttSubscription implements Subscription {
/// [ProtocolSubscription] for the MQTT protocol.
final class MqttSubscription extends ProtocolSubscription {
/// Constructor.
MqttSubscription(
this._form,
this._client,
this._complete, {
super._complete, {
required void Function(Content content) next,
void Function(Exception error)? error,
}) : _active = true {
Expand Down Expand Up @@ -52,8 +52,6 @@ class MqttSubscription implements Subscription {

bool _active = true;

final void Function() _complete;

@override
bool get active => _active;

Expand All @@ -65,6 +63,7 @@ class MqttSubscription implements Subscription {
}) async {
_client.disconnect();
_active = false;
_complete();
await super
.stop(formIndex: formIndex, uriVariables: uriVariables, data: data);
}
}
54 changes: 19 additions & 35 deletions lib/src/core/implementation/consumed_thing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class ConsumedThing implements scripting_api.ConsumedThing {
/// The [title] of the Thing.
final String title;

final Map<String, scripting_api.Subscription> _subscribedEvents = {};
final Set<String> _subscribedEvents = {};

final Map<String, scripting_api.Subscription> _observedProperties = {};
final Set<String> _observedProperties = {};

/// Determines the id of this [ConsumedThing].
String get identifier => thingDescription.identifier;
Expand Down Expand Up @@ -229,7 +229,7 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

if (_observedProperties.containsKey(propertyName)) {
if (_observedProperties.contains(propertyName)) {
throw StateError(
"ConsumedThing '$title' already has a function "
"subscribed to $propertyName. You can only observe once",
Expand Down Expand Up @@ -259,15 +259,12 @@ class ConsumedThing implements scripting_api.ConsumedThing {
required Map<String, Object>? uriVariables,
}) async {
final OperationType operationType;
final Map<String, scripting_api.Subscription> subscriptions;

switch (subscriptionType) {
case scripting_api.SubscriptionType.property:
operationType = OperationType.observeproperty;
subscriptions = _observedProperties;
case scripting_api.SubscriptionType.event:
operationType = OperationType.subscribeevent;
subscriptions = _subscribedEvents;
}

final (client, form) = _getClientFor(
Expand All @@ -288,17 +285,10 @@ class ConsumedThing implements scripting_api.ConsumedThing {
onError(error);
}
},
complete: () => removeSubscription(affordanceName, subscriptionType),
complete: () => _removeSubscription(affordanceName, subscriptionType),
);

switch (subscriptionType) {
case scripting_api.SubscriptionType.property:
_observedProperties[affordanceName] = subscription;
case scripting_api.SubscriptionType.event:
_subscribedEvents[affordanceName] = subscription;
}

subscriptions[affordanceName] = subscription;
_addSubscription(affordanceName, subscriptionType);

return subscription;
}
Expand Down Expand Up @@ -376,7 +366,7 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

if (_subscribedEvents.containsKey(eventName)) {
if (_subscribedEvents.contains(eventName)) {
throw DartWotException(
"ConsumedThing '$title' already has a function "
"subscribed to $eventName. You can only subscribe once.",
Expand Down Expand Up @@ -407,31 +397,25 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

/// Removes a subscription with a specified [key] and [type].
void removeSubscription(String key, scripting_api.SubscriptionType type) {
void _addSubscription(
String key,
scripting_api.SubscriptionType type,
) {
switch (type) {
case scripting_api.SubscriptionType.property:
_observedProperties.remove(key);
_observedProperties.add(key);
case scripting_api.SubscriptionType.event:
_subscribedEvents.remove(key);
_subscribedEvents.add(key);
}
}

/// Cleans up the resources used by this [ConsumedThing].
bool destroy({bool external = true}) {
for (final observedProperty in _observedProperties.values) {
observedProperty.stop();
}
_observedProperties.clear();
for (final subscribedEvent in _subscribedEvents.values) {
subscribedEvent.stop();
}
_subscribedEvents.clear();

if (external) {
return servient.deregisterConsumedThing(this);
/// Removes a subscription with a specified [key] and [type].
void _removeSubscription(String key, scripting_api.SubscriptionType type) {
switch (type) {
case scripting_api.SubscriptionType.property:
_observedProperties.remove(key);
case scripting_api.SubscriptionType.event:
_subscribedEvents.remove(key);
}

return false;
}
}
36 changes: 2 additions & 34 deletions lib/src/core/implementation/servient.dart
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class InternalServient implements Servient {
final List<ProtocolServer> _servers = [];
final Map<String, ProtocolClientFactory> _clientFactories = {};
final Map<String, ExposedThing> _things = {};
final Set<ConsumedThing> _consumedThings = {};

final ServerSecurityCallback? _serverSecurityCallback;

Expand Down Expand Up @@ -127,10 +126,6 @@ class InternalServient implements Servient {
clientFactory.destroy();
}
_clientFactories.clear();
for (final consumedThing in _consumedThings) {
consumedThing.destroy();
}
_consumedThings.clear();

final serverStatuses = _servers.map((server) => server.stop()).toList();
await Future.wait(serverStatuses);
Expand Down Expand Up @@ -177,29 +172,6 @@ class InternalServient implements Servient {
return true;
}

/// Removes and cleans up the resources of a [ConsumedThing].
///
/// If the [ConsumedThing] has not been registered before, `false` is
/// returned, otherwise `true`.
bool destroyConsumedThing(ConsumedThing consumedThing) {
return consumedThing.destroy(external: false);
}

/// De-registers the given [consumedThing].
///
/// If the [ConsumedThing] has not been registered before, `false` is
/// returned, otherwise `true`.
bool deregisterConsumedThing(ConsumedThing consumedThing) {
return _consumedThings.remove(consumedThing);
}

/// Adds a [ConsumedThing] to the servient if it hasn't been registered
/// before.
///
/// Returns `false` if the [thing] has already been registered, otherwise
/// `true`.
bool addConsumedThing(ConsumedThing thing) => _consumedThings.add(thing);

/// Returns an [ExposedThing] with the given [id] if it has been registered.
ExposedThing? thing(String id) => _things[id];

Expand Down Expand Up @@ -270,12 +242,8 @@ class InternalServient implements Servient {
/// Consumes a [ThingDescription] and returns a [scripting_api.ConsumedThing].
Future<scripting_api.ConsumedThing> consume(
ThingDescription thingDescription,
) async {
final newThing = ConsumedThing(this, thingDescription);
addConsumedThing(newThing);

return newThing;
}
) async =>
ConsumedThing(this, thingDescription);

/// Exposes a Thing based on an [scripting_api.ExposedThingInit].
Future<scripting_api.ExposedThing> produce(
Expand Down
1 change: 1 addition & 0 deletions lib/src/core/protocol_interfaces.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
export "protocol_interfaces/protocol_client.dart";
export "protocol_interfaces/protocol_client_factory.dart";
export "protocol_interfaces/protocol_server.dart";
export "protocol_interfaces/protocol_subscription.dart";
30 changes: 30 additions & 0 deletions lib/src/core/protocol_interfaces/protocol_subscription.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// SPDX-License-Identifier: BSD-3-Clause

import "package:meta/meta.dart";

import "../scripting_api.dart";

/// Base class for implementations of the [Subscription] interface.
abstract base class ProtocolSubscription implements Subscription {
/// Instantiates a new [ProtocolSubscription].
///
/// The [_complete] callback will be called when the [ProtocolSubscription]
/// has been [stop]ped (either internally or externally).
ProtocolSubscription(this._complete);

final void Function() _complete;

@override
@mustCallSuper
Future<void> stop({
int? formIndex,
Map<String, Object>? uriVariables,
Object? data,
}) async {
_complete();
}
}

0 comments on commit 9255bea

Please sign in to comment.