Skip to content

Commit

Permalink
Add ability to pass optional message adapter for receiving/publishing…
Browse files Browse the repository at this point in the history
… data (#16)
  • Loading branch information
alfianlosari authored Jul 20, 2023
1 parent 4a5f8bb commit 9bd6fc3
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 32 deletions.
82 changes: 58 additions & 24 deletions courier_dart_sdk/lib/courier_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ abstract class CourierClient {
void destroy();
void subscribe(String topic, QoS qos);
void unsubscribe(String topic);
Stream<T> courierMessageStream<T>(String topic, {dynamic decoder});
Stream<T> courierMessageStream<T>(String topic,
{MessageAdapter adapter, dynamic decoder});
Stream<Uint8List> courierBytesStream(String topic);

void publishCourierMessage(CourierMessage message, {dynamic encoder});
void publishCourierMessage(CourierMessage message,
{MessageAdapter adapter, dynamic encoder});
void publishCourierBytes(Uint8List bytes, String topic, QoS qos);
Stream<CourierEvent> courierEventStream();

static CourierClient create(
Expand Down Expand Up @@ -93,39 +96,55 @@ class _CourierClientImpl implements CourierClient {
}

@override
void publishCourierMessage(CourierMessage message, {dynamic encoder}) {
void publishCourierMessage(CourierMessage message,
{MessageAdapter? adapter, dynamic encoder}) {
log('Send method invoked');
_sendMessage(message, encoder);
_sendMessage(message, adapter, encoder);
}

@override
Stream<T> courierMessageStream<T>(String topic, {dynamic decoder}) {
void publishCourierBytes(Uint8List bytes, String topic, QoS qos) {
publishCourierMessage(
CourierMessage(payload: bytes, topic: topic, qos: qos),
adapter: const BytesMessageAdapter());
}

@override
Stream<T> courierMessageStream<T>(String topic,
{MessageAdapter? adapter, dynamic decoder}) {
log('courier message stream, topic: $topic $T');
return messageStreamController.stream
.where((event) => event.topic == topic)
.map((event) {
log('Decoding topic: $topic $T');
final bytes = event.payload as Uint8List;
for (final adapter in messageAdapters) {
try {
T item = adapter.decode(bytes, decoder);
log('Decoding success with $adapter');
return item;
} catch (error) {
log('Decoding Adapter $adapter not compatible ${error.toString()}');
if (adapter != null) {
T item = adapter.decode(bytes, decoder);
log('Decoding success with provided $adapter');
return item;
} else {
for (final adapter in messageAdapters) {
try {
T item = adapter.decode(bytes, decoder);
log('Decoding success with $adapter');
return item;
} catch (error) {
log('Decoding Adapter $adapter not compatible ${error.toString()}');
}
}
}
T item = decoder(bytes);
log('Decoding success for $T using decoder closure');
return item;
}).handleError((e) {
log('Error Decode $T for $topic' + e.toString());
}).map((v) => v as T);
});
}

@override
Stream<Uint8List> courierBytesStream(String topic) =>
courierMessageStream<Uint8List>(topic);
courierMessageStream<Uint8List>(topic,
adapter: const BytesMessageAdapter());

@override
Stream<CourierEvent> courierEventStream() {
Expand Down Expand Up @@ -202,20 +221,35 @@ class _CourierClientImpl implements CourierClient {
}
}

Future<void> _sendMessage(CourierMessage message, dynamic encoder) async {
Future<void> _sendMessage(
CourierMessage message, MessageAdapter? adapter, dynamic encoder) async {
try {
log('Send/Encoding: topic ${message.topic} ${message.payload.toString()}');
for (final adapter in messageAdapters) {
try {
final map = _convertToMap(message, adapter, encoder);
_platform.invokeMethod('send', map);
log('Send/Encoding success with adapter $adapter topic ${message.topic} ${message.payload.toString()}');
} catch (error) {
log('Encoding Adapter $adapter not compatible ${error.toString()}');
log(error.toString());
if (adapter != null) {
final map = _convertToMap(message, adapter, encoder);
_platform.invokeMethod('send', map);
log('Send/Encoding success with provided adapter $adapter topic ${message.topic} ${message.payload.toString()}');
return;
} else {
for (final adapter in messageAdapters) {
try {
final map = _convertToMap(message, adapter, encoder);
_platform.invokeMethod('send', map);
log('Send/Encoding success with adapter $adapter topic ${message.topic} ${message.payload.toString()}');
return;
} catch (error) {
log('Encoding Adapter $adapter not compatible ${error.toString()}');
log(error.toString());
}
}
}
throw Exception("Not supported");
final map = {
"message": encoder(message.payload),
"topic": message.topic,
"qos": message.qos.value
};
_platform.invokeMethod('send', map);
log('Send/Encoding success with encoder. topic ${message.topic} ${message.payload.toString()}');
} catch (e) {
log('Send/Encoding failed: topic ${message.topic} } ${e.toString()}');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ class JSONMessageAdapter extends MessageAdapter {

@override
Uint8List encode(Object object, String topic, dynamic encoder) {
if (encoder != null) {
return encoder(object);
}
final json = jsonEncode(object);
final List<int> codeUnits = json.codeUnits;
final Uint8List bytes = Uint8List.fromList(codeUnits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ class StringMessageAdapter extends MessageAdapter {

@override
Uint8List encode(Object object, String topic, dynamic encoder) {
if (object is String) {
List<int> bytes = utf8.encode(object);
return Uint8List.fromList(bytes);
}
return encoder(object);
List<int> bytes = utf8.encode(object as String);
return Uint8List.fromList(bytes);
}
}
26 changes: 26 additions & 0 deletions courier_dart_sdk_demo/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ class MyHomePage extends StatelessWidget {
log("Message received person: ${person.name}");
});

courierClient
.courierMessageStream<Person>(
"person/6b57d4e5-0fce-4917-b343-c8a1c77405e5/update",
adapter: const JSONMessageAdapter(),
decoder: Person.fromJson)
.listen((person) {
log("Message received person using explicit JSON Adapter: ${person.name}");
});

courierClient.subscribe(
"pet/6b57d4e5-0fce-4917-b343-c8a1c77405e5/update", QoS.one);

Expand All @@ -141,6 +150,14 @@ class MyHomePage extends StatelessWidget {
log("Message received Pet: ${pet.name}");
});

courierClient
.courierMessageStream<Uint8List>(
"pet/6b57d4e5-0fce-4917-b343-c8a1c77405e5/update",
adapter: const BytesMessageAdapter())
.listen((petBytes) {
log("Message received PetBytes using explicit bytes message adapter: ${petBytes.toString()}");
});

courierClient
.courierBytesStream(
"pet/6b57d4e5-0fce-4917-b343-c8a1c77405e5/update",
Expand Down Expand Up @@ -176,6 +193,15 @@ class MyHomePage extends StatelessWidget {
topic: "person/6b57d4e5-0fce-4917-b343-c8a1c77405e5/update",
qos: QoS.one));

courierClient.publishCourierMessage(
CourierMessage(
payload: Person(
name:
textMessage + "explicit encoding with JSONMessageAdapter"),
topic: "person/6b57d4e5-0fce-4917-b343-c8a1c77405e5/update",
qos: QoS.one),
adapter: const JSONMessageAdapter());

final pet = Pet();
pet.name = "Hello Pet";
courierClient.publishCourierMessage(CourierMessage(
Expand Down

0 comments on commit 9bd6fc3

Please sign in to comment.