From a6c230a603140c758b63c407595bb10b4f331f45 Mon Sep 17 00:00:00 2001 From: Muzzammil Shahid Date: Thu, 12 Sep 2024 15:31:31 +0500 Subject: [PATCH] Implement progressive call invocations --- lib/src/dealer.dart | 76 ++++++++++++++++++++++++++++++++++++------- pubspec.yaml | 1 + test/dealer_test.dart | 37 +++++++++++++++++++++ 3 files changed, 102 insertions(+), 12 deletions(-) diff --git a/lib/src/dealer.dart b/lib/src/dealer.dart index eaa907a..367ee2a 100644 --- a/lib/src/dealer.dart +++ b/lib/src/dealer.dart @@ -1,3 +1,4 @@ +import "package:meta/meta.dart"; import "package:wampproto/idgen.dart"; import "package:wampproto/messages.dart"; import "package:wampproto/src/types.dart"; @@ -7,19 +8,43 @@ const optionReceiveProgress = "receive_progress"; const optionProgress = "progress"; class PendingInvocation { - PendingInvocation(this.requestID, this.callerID, this.calleeID, {required this.receiveProgress}); + PendingInvocation( + this.requestID, + this.callerID, + this.calleeID, { + required this.progress, + required this.receiveProgress, + }); final int requestID; final int callerID; final int calleeID; + final bool progress; final bool receiveProgress; } +@immutable +class CallMap { + const CallMap(this.callerID, this.callID); + + final int callerID; + final int callID; + + @override + bool operator ==(Object other) => + identical(this, other) || + other is CallMap && runtimeType == other.runtimeType && callerID == other.callerID && callID == other.callID; + + @override + int get hashCode => callerID.hashCode ^ callID.hashCode; +} + class Dealer { final Map _registrationsByProcedure = {}; final Map> _registrationsBySession = {}; final Map _pendingCalls = {}; final Map _sessions = {}; + final Map _callToInvocationId = {}; final _idGen = SessionScopeIDGenerator(); @@ -57,6 +82,12 @@ class Dealer { return _registrationsByProcedure.containsKey(procedure); } + void _addCall(int callId, int invocationId, int callerId, int calleeId, bool progress, bool receiveProgress) { + _pendingCalls[invocationId] = + PendingInvocation(callId, callerId, calleeId, progress: progress, receiveProgress: receiveProgress); + _callToInvocationId[CallMap(callerId, callId)] = invocationId; + } + MessageWithRecipient receiveMessage(int sessionID, Message message) { if (message is Call) { var registration = _registrationsByProcedure[message.uri]; @@ -72,20 +103,34 @@ class Dealer { } var receiveProgress = message.options[optionReceiveProgress] ?? false; - int requestID = _idGen.next(); - _pendingCalls[requestID] = PendingInvocation( - message.requestID, - sessionID, - calleeID, - receiveProgress: receiveProgress, - ); + var progress = message.options[optionProgress] ?? false; + int? invocationID; + if (progress) { + invocationID = _callToInvocationId[CallMap(sessionID, message.requestID)]; + if (invocationID == null) { + invocationID = _idGen.next(); + _addCall(message.requestID, invocationID, sessionID, calleeID, progress, receiveProgress); + } + } else { + invocationID = _idGen.next(); + _addCall(message.requestID, invocationID, sessionID, calleeID, progress, receiveProgress); + } + + Map details = {}; + if (receiveProgress) { + details[optionReceiveProgress] = true; + } + + if (progress) { + details[optionProgress] = true; + } var invocation = Invocation( - requestID, + invocationID, registration.id, args: message.args, kwargs: message.kwargs, - details: receiveProgress ? {optionReceiveProgress: receiveProgress} : {}, + details: details, ); return MessageWithRecipient(invocation, calleeID); } else if (message is Yield) { @@ -105,6 +150,7 @@ class Dealer { details[optionProgress] = receiveProgress; } else { _pendingCalls.remove(message.requestID); + _callToInvocationId.remove(CallMap(invocation.callerID, invocation.requestID)); } var result = Result(invocation.requestID, args: message.args, kwargs: message.kwargs, details: details); return MessageWithRecipient(result, invocation.callerID); @@ -158,8 +204,14 @@ class Dealer { _pendingCalls.remove(message.requestID); - var errMessage = Error(Call.id, pending.requestID, message.uri, - args: message.args, kwargs: message.kwargs, details: message.details); + var errMessage = Error( + Call.id, + pending.requestID, + message.uri, + args: message.args, + kwargs: message.kwargs, + details: message.details, + ); return MessageWithRecipient(errMessage, pending.callerID); } else { diff --git a/pubspec.yaml b/pubspec.yaml index a28fa3d..0bd1c75 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -12,6 +12,7 @@ dependencies: cbor: ^6.2.0 collection: ^1.18.0 crypto: ^3.0.3 + meta: ^1.15.0 msgpack_dart: git: https://github.com/xconnio/msgpack_dart pinenacl: ^0.6.0 diff --git a/test/dealer_test.dart b/test/dealer_test.dart index d698429..b4f61a1 100644 --- a/test/dealer_test.dart +++ b/test/dealer_test.dart @@ -141,5 +141,42 @@ void main() { throwsA(predicate((e) => e is Exception && e.toString().contains("no pending calls for session"))), ); }); + + test("test progressive call invocation", () { + var procedureProCallInv = "io.xconn.test_progressive_call_inv"; + var calleeId = 5; + var callerId = 6; + dealer + ..addSession(SessionDetails(calleeId, "realm1", "authid", "authrole")) + ..addSession(SessionDetails(callerId, "realm1", "authid", "authrole")); + + var register = Register(1, procedureProCallInv); + dealer.receiveMessage(calleeId, register); + + var call = Call(2, procedureProCallInv, options: {optionProgress: true}); + var messageWithRecipient = dealer.receiveMessage(callerId, call); + expect(messageWithRecipient.recipient, calleeId); + + var invMessage = messageWithRecipient.message as Invocation; + expect(invMessage.details[optionProgress], isTrue); + var invocationRequestId = invMessage.requestID; + + for (var i = 0; i < 10; i++) { + var call = Call(2, procedureProCallInv, options: {optionProgress: true}); + var messageWithRecipient = dealer.receiveMessage(callerId, call); + expect(messageWithRecipient.recipient, calleeId); + + var invMessage = messageWithRecipient.message as Invocation; + expect(invMessage.details[optionProgress], isTrue); + expect(invMessage.requestID, equals(invocationRequestId)); + } + + var callMsg = Call(2, procedureProCallInv); + var msgWithRecipient = dealer.receiveMessage(callerId, callMsg); + expect(msgWithRecipient.recipient, calleeId); + + var invocationMessage = msgWithRecipient.message as Invocation; + expect(invocationMessage.details.containsKey(optionProgress), isFalse); + }); }); }