Skip to content

Commit

Permalink
Implement progressive call invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
muzzammilshahid committed Dec 31, 2024
1 parent 3c35228 commit a6c230a
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 12 deletions.
76 changes: 64 additions & 12 deletions lib/src/dealer.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import "package:meta/meta.dart";
import "package:wampproto/idgen.dart";
import "package:wampproto/messages.dart";
import "package:wampproto/src/types.dart";
Expand All @@ -7,19 +8,43 @@ const optionReceiveProgress = "receive_progress";
const optionProgress = "progress";

class PendingInvocation {
PendingInvocation(this.requestID, this.callerID, this.calleeID, {required this.receiveProgress});
PendingInvocation(
this.requestID,
this.callerID,
this.calleeID, {
required this.progress,
required this.receiveProgress,
});

final int requestID;
final int callerID;
final int calleeID;
final bool progress;
final bool receiveProgress;
}

@immutable
class CallMap {
const CallMap(this.callerID, this.callID);

final int callerID;
final int callID;

@override
bool operator ==(Object other) =>
identical(this, other) ||
other is CallMap && runtimeType == other.runtimeType && callerID == other.callerID && callID == other.callID;

@override
int get hashCode => callerID.hashCode ^ callID.hashCode;
}

class Dealer {
final Map<String, Registration> _registrationsByProcedure = {};
final Map<int, Map<int, Registration>> _registrationsBySession = {};
final Map<int, PendingInvocation> _pendingCalls = {};
final Map<int, SessionDetails> _sessions = {};
final Map<CallMap, int> _callToInvocationId = {};

final _idGen = SessionScopeIDGenerator();

Expand Down Expand Up @@ -57,6 +82,12 @@ class Dealer {
return _registrationsByProcedure.containsKey(procedure);
}

void _addCall(int callId, int invocationId, int callerId, int calleeId, bool progress, bool receiveProgress) {
_pendingCalls[invocationId] =
PendingInvocation(callId, callerId, calleeId, progress: progress, receiveProgress: receiveProgress);
_callToInvocationId[CallMap(callerId, callId)] = invocationId;
}

MessageWithRecipient receiveMessage(int sessionID, Message message) {
if (message is Call) {
var registration = _registrationsByProcedure[message.uri];
Expand All @@ -72,20 +103,34 @@ class Dealer {
}

var receiveProgress = message.options[optionReceiveProgress] ?? false;
int requestID = _idGen.next();
_pendingCalls[requestID] = PendingInvocation(
message.requestID,
sessionID,
calleeID,
receiveProgress: receiveProgress,
);
var progress = message.options[optionProgress] ?? false;
int? invocationID;
if (progress) {
invocationID = _callToInvocationId[CallMap(sessionID, message.requestID)];
if (invocationID == null) {
invocationID = _idGen.next();
_addCall(message.requestID, invocationID, sessionID, calleeID, progress, receiveProgress);
}
} else {
invocationID = _idGen.next();
_addCall(message.requestID, invocationID, sessionID, calleeID, progress, receiveProgress);
}

Map<String, dynamic> details = {};
if (receiveProgress) {
details[optionReceiveProgress] = true;
}

if (progress) {
details[optionProgress] = true;
}

var invocation = Invocation(
requestID,
invocationID,
registration.id,
args: message.args,
kwargs: message.kwargs,
details: receiveProgress ? {optionReceiveProgress: receiveProgress} : {},
details: details,
);
return MessageWithRecipient(invocation, calleeID);
} else if (message is Yield) {
Expand All @@ -105,6 +150,7 @@ class Dealer {
details[optionProgress] = receiveProgress;
} else {
_pendingCalls.remove(message.requestID);
_callToInvocationId.remove(CallMap(invocation.callerID, invocation.requestID));
}
var result = Result(invocation.requestID, args: message.args, kwargs: message.kwargs, details: details);
return MessageWithRecipient(result, invocation.callerID);
Expand Down Expand Up @@ -158,8 +204,14 @@ class Dealer {

_pendingCalls.remove(message.requestID);

var errMessage = Error(Call.id, pending.requestID, message.uri,
args: message.args, kwargs: message.kwargs, details: message.details);
var errMessage = Error(
Call.id,
pending.requestID,
message.uri,
args: message.args,
kwargs: message.kwargs,
details: message.details,
);

return MessageWithRecipient(errMessage, pending.callerID);
} else {
Expand Down
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies:
cbor: ^6.2.0
collection: ^1.18.0
crypto: ^3.0.3
meta: ^1.15.0
msgpack_dart:
git: https://github.com/xconnio/msgpack_dart
pinenacl: ^0.6.0
Expand Down
37 changes: 37 additions & 0 deletions test/dealer_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,42 @@ void main() {
throwsA(predicate((e) => e is Exception && e.toString().contains("no pending calls for session"))),
);
});

test("test progressive call invocation", () {
var procedureProCallInv = "io.xconn.test_progressive_call_inv";
var calleeId = 5;
var callerId = 6;
dealer
..addSession(SessionDetails(calleeId, "realm1", "authid", "authrole"))
..addSession(SessionDetails(callerId, "realm1", "authid", "authrole"));

var register = Register(1, procedureProCallInv);
dealer.receiveMessage(calleeId, register);

var call = Call(2, procedureProCallInv, options: {optionProgress: true});
var messageWithRecipient = dealer.receiveMessage(callerId, call);
expect(messageWithRecipient.recipient, calleeId);

var invMessage = messageWithRecipient.message as Invocation;
expect(invMessage.details[optionProgress], isTrue);
var invocationRequestId = invMessage.requestID;

for (var i = 0; i < 10; i++) {
var call = Call(2, procedureProCallInv, options: {optionProgress: true});
var messageWithRecipient = dealer.receiveMessage(callerId, call);
expect(messageWithRecipient.recipient, calleeId);

var invMessage = messageWithRecipient.message as Invocation;
expect(invMessage.details[optionProgress], isTrue);
expect(invMessage.requestID, equals(invocationRequestId));
}

var callMsg = Call(2, procedureProCallInv);
var msgWithRecipient = dealer.receiveMessage(callerId, callMsg);
expect(msgWithRecipient.recipient, calleeId);

var invocationMessage = msgWithRecipient.message as Invocation;
expect(invocationMessage.details.containsKey(optionProgress), isFalse);
});
});
}

0 comments on commit a6c230a

Please sign in to comment.