Skip to content

Commit

Permalink
Add oneOff to send() and onInitialMessage() to MultiListener mixin (#48)
Browse files Browse the repository at this point in the history
* Add oneOff parameter to send()

* Add onInitialMessage() to MultiListener mixin

* Change Listener stream controller and subscription from late final to
nullable

* Add _BufferedMessage as wrapper on messages buffered in
MessageSinkRegister

* Update changelog and pubspec
  • Loading branch information
lewandowski-jan authored Feb 12, 2023
1 parent 53de015 commit 492d764
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 26 deletions.
9 changes: 8 additions & 1 deletion packages/comms/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.0.9

- Add `oneOff` to `send()` for marking message as available to read only once
from buffer

- Add `onInitialMessage()` to `MultiListener` mixin

## 0.0.8+2

- Add README (#46)
Expand All @@ -12,7 +19,7 @@

## 0.0.7

- Add `onInitialMessage()` method to the `Listener` mixin (#41)
- Add `onInitialMessage()` to `Listener` mixin (#41)

## 0.0.6

Expand Down
15 changes: 7 additions & 8 deletions packages/comms/lib/src/listener.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ typedef OnMessage<Message> = void Function(Message message);
///
/// * [MultiListener], which enables listening to multiple message types.
mixin Listener<Message> {
late final StreamController<Message> _messageStreamController;

late final StreamSubscription<Message> _messageSubscription;
StreamController<Message>? _messageStreamController;
StreamSubscription<Message>? _messageSubscription;

/// Unique identifier of the [Listener]'s messageSink in [MessageSinkRegister].
String? _id;
Expand All @@ -35,18 +34,18 @@ mixin Listener<Message> {
}
_messageStreamController = StreamController<Message>();
_id = MessageSinkRegister()._add(
_messageStreamController.sink,
_messageStreamController!.sink,
onInitialMessage: onInitialMessage,
);
_messageSubscription = _messageStreamController.stream.listen(onMessage);
_messageSubscription = _messageStreamController!.stream.listen(onMessage);
}

/// Called every time a new [message] is received.
@protected
void onMessage(Message message);

/// Called when registering [Listener] if a message of type [Message] was
/// already sent by [Sender] earlier.
/// sent earlier by [Sender].
@protected
void onInitialMessage(Message message) {}

Expand All @@ -60,8 +59,8 @@ mixin Listener<Message> {
void cancel() {
if (_id != null) {
MessageSinkRegister()._remove(_id!);
_messageStreamController.close();
_messageSubscription.cancel();
_messageStreamController?.close();
_messageSubscription?.cancel();
_id = null;
}
}
Expand Down
34 changes: 27 additions & 7 deletions packages/comms/lib/src/message_sink_register.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,28 @@ class MessageSinkRegister {
final _messageSinks = <String, StreamSink>{};

/// All last messages sent with each type
final _messageBuffers = <Type, dynamic>{};
final _messageBuffers = <Type, _BufferedMessage<dynamic>>{};

/// Adds a [messageSink] to [MessageSinkRegister]'s [_messageSinks] with
/// unique id from [_uuid]
String _add<Message>(
StreamSink<Message> messageSink, {
OnMessage<Message>? onInitialMessage,
required OnMessage<Message> onInitialMessage,
}) {
final id = _uuid.v1();
_messageSinks[id] = messageSink;
_log('Added sink ${messageSink.runtimeType}');

final bufferedMessage = _messageBuffers[Message] as Message?;
if (bufferedMessage != null) {
onInitialMessage?.call(bufferedMessage);
final bufferedMessage = _messageBuffers[Message];

final message = bufferedMessage?.message as Message?;

if (message != null) {
onInitialMessage(message);

if (bufferedMessage?.oneOff ?? false) {
_messageBuffers.remove(Message);
}
}

return id;
Expand All @@ -80,11 +87,24 @@ class MessageSinkRegister {

/// Adds [message] to all sinks in [MessageSinkRegister]'s [_messageSinks]
/// of type [Message] and updates [_messageBuffers].
void sendToSinksOfType<Message>(Message message) {
void sendToSinksOfType<Message>(Message message, {bool oneOff = false}) {
getSinksOfType<Message>().forEach(
(sink) => sink.add(message),
);

_messageBuffers[Message] = message;
_messageBuffers[Message] = _BufferedMessage<Message>(
message: message,
oneOff: oneOff,
);
}
}

class _BufferedMessage<Message> {
_BufferedMessage({
required this.message,
required this.oneOff,
});

final Message message;
final bool oneOff;
}
21 changes: 18 additions & 3 deletions packages/comms/lib/src/multi_listener.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@ class ListenerDelegate<T> with Listener<T> {
ListenerDelegate();

late final OnMessage _onMessage;
late final OnMessage _onInitialMessage;

@protected
@nonVirtual
void _init(OnMessage onMessage) {
void _init(OnMessage onMessage, OnMessage onInitialMessage) {
_onMessage = onMessage;
_onInitialMessage = onInitialMessage;
listen();
}

@protected
@nonVirtual
@override
void onMessage(T message) => _onMessage(message);

@protected
@nonVirtual
@override
void onInitialMessage(T message) => _onInitialMessage(message);
}

/// A mixin used on classes that want to receive messages of multiple types.
Expand All @@ -41,14 +48,22 @@ mixin MultiListener {
@nonVirtual
void listen() => listenerDelegates.forEach(_listen);

void _listen(ListenerDelegate listenerDelegate) =>
listenerDelegate.._init(onMessage);
void _listen(ListenerDelegate listenerDelegate) => listenerDelegate
.._init(
onMessage,
onInitialMessage,
);

/// Called every time a new [message] of type specified in [listenerDelegates]
/// is received.
@protected
void onMessage(dynamic message);

/// Called when registering [MultiListener] if a message of type specified in
/// [listenerDelegates] was sent earlier by [Sender].
@protected
void onInitialMessage(dynamic message) {}

/// Stops receiving messages.
///
/// Removes message sinks from [MessageSinkRegister].
Expand Down
15 changes: 9 additions & 6 deletions packages/comms/lib/src/sender.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ part of '../comms.dart';

/// Signature for functions sending message to [Listener]s listening for type
/// [Message].
typedef Send<Message> = void Function(Message message);
typedef Send<Message> = void Function(Message message, {bool oneOff});

/// A mixin used on classes that want to send messages of type [Message], by
/// providing [send] function.
Expand All @@ -11,16 +11,19 @@ typedef Send<Message> = void Function(Message message);
///
/// * [getSend], which returns [send] function
mixin Sender<Message> {
/// Sends [message] to all [Listener]s of type [Message].
///
/// When [oneOff] is `true` [message] will be removed from the buffer after
/// the first [Listener] of same type calls listen().
@protected
@nonVirtual
void send(Message message) {
MessageSinkRegister().sendToSinksOfType<Message>(message);
void send(Message message, {bool oneOff = false}) {
MessageSinkRegister().sendToSinksOfType<Message>(message, oneOff: oneOff);
}
}

/// Returns function to send messages to all [Listener]s of type of the type
/// parameterer [Message], without the need of instatiating class with [Sender]
/// mixin.
/// Returns function to send messages to all [Listener]s of type [Message],
/// without the need of instatiating class with [Sender] mixin.
Send<Message> getSend<Message>() {
return MessageSinkRegister().sendToSinksOfType<Message>;
}
2 changes: 1 addition & 1 deletion packages/comms/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: comms
description: Simple communication pattern abstraction on streams, created for communication between logic classes.
version: 0.0.8+2
version: 0.0.9
homepage: https://github.com/leancodepl/comms

environment:
Expand Down

0 comments on commit 492d764

Please sign in to comment.