Skip to content

Commit

Permalink
Merge pull request #69 from muzzammilshahid/progressive-call-results
Browse files Browse the repository at this point in the history
Implement progressive call results
  • Loading branch information
muzzammilshahid authored Sep 20, 2024
2 parents aaf15c0 + 2bd4e84 commit fd8f83f
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 14 deletions.
26 changes: 20 additions & 6 deletions dealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/xconnio/wampproto-go/messages"
)

const (
OptionReceiveProgress = "receive_progress"
OptionProgress = "progress"
)

type PendingInvocation struct {
RequestID int64
CallerID int64
Expand Down Expand Up @@ -107,20 +112,23 @@ func (d *Dealer) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
callee = session
break
}
receiveProgress, _ := call.Options()[OptionReceiveProgress].(bool)

invocationID := d.idGen.NextID()
d.pendingCalls[invocationID] = &PendingInvocation{
RequestID: call.RequestID(),
CallerID: sessionID,
CalleeID: callee,
RequestID: call.RequestID(),
CallerID: sessionID,
CalleeID: callee,
ReceiveProgress: receiveProgress,
}

var invocation *messages.Invocation
if call.PayloadIsBinary() && d.sessions[callee].StaticSerializer() {
invocation = messages.NewInvocationBinary(invocationID, regs.ID, nil, call.Payload(),
call.PayloadSerializer())
} else {
invocation = messages.NewInvocation(invocationID, regs.ID, nil, call.Args(), call.KwArgs())
details := map[string]any{OptionReceiveProgress: receiveProgress}
invocation = messages.NewInvocation(invocationID, regs.ID, details, call.Args(), call.KwArgs())
}

return &MessageWithRecipient{Message: invocation, Recipient: callee}, nil
Expand All @@ -131,13 +139,19 @@ func (d *Dealer) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
return nil, fmt.Errorf("yield: not pending calls for session %d", sessionID)
}

delete(d.pendingCalls, yield.RequestID())
progress, _ := yield.Options()[OptionProgress].(bool)
var details map[string]any
if pending.ReceiveProgress && progress {
details = map[string]any{OptionProgress: progress}
} else {
delete(d.pendingCalls, yield.RequestID())
}

var result *messages.Result
if yield.PayloadIsBinary() && d.sessions[pending.CallerID].StaticSerializer() {
result = messages.NewResultBinary(pending.RequestID, nil, yield.Payload(), yield.PayloadSerializer())
} else {
result = messages.NewResult(pending.RequestID, nil, yield.Args(), yield.KwArgs())
result = messages.NewResult(pending.RequestID, details, yield.Args(), yield.KwArgs())
}

return &MessageWithRecipient{Message: result, Recipient: pending.CallerID}, nil
Expand Down
42 changes: 42 additions & 0 deletions dealer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,45 @@ func TestDealerRegisterUnregister(t *testing.T) {
})
})
}

func TestProgressiveCallResults(t *testing.T) {
dealer := wampproto.NewDealer()

callee := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false)
caller := wampproto.NewSessionDetails(2, "realm", "authid", "anonymous", false)

err := dealer.AddSession(callee)
require.NoError(t, err)
err = dealer.AddSession(caller)
require.NoError(t, err)

register := messages.NewRegister(1, nil, "foo.bar")
_, err = dealer.ReceiveMessage(callee.ID(), register)
require.NoError(t, err)

call := messages.NewCall(caller.ID(), map[string]any{wampproto.OptionReceiveProgress: true}, "foo.bar", []any{}, nil)
messageWithRecipient, err := dealer.ReceiveMessage(callee.ID(), call)
require.NoError(t, err)
require.Equal(t, callee.ID(), messageWithRecipient.Recipient)
invocation := messageWithRecipient.Message.(*messages.Invocation)
require.True(t, invocation.Details()[wampproto.OptionReceiveProgress].(bool))

for i := 0; i < 10; i++ {
yield := messages.NewYield(invocation.RequestID(), map[string]any{wampproto.OptionProgress: true}, []any{}, nil)
messageWithRecipient, err = dealer.ReceiveMessage(callee.ID(), yield)
require.NoError(t, err)
require.Equal(t, callee.ID(), messageWithRecipient.Recipient)
result := messageWithRecipient.Message.(*messages.Result)
require.Equal(t, call.RequestID(), result.RequestID())
require.True(t, result.Details()[wampproto.OptionProgress].(bool))
}

yield := messages.NewYield(invocation.RequestID(), map[string]any{}, []any{}, nil)
messageWithRecipient, err = dealer.ReceiveMessage(callee.ID(), yield)
require.NoError(t, err)
require.Equal(t, callee.ID(), messageWithRecipient.Recipient)
result := messageWithRecipient.Message.(*messages.Result)
require.Equal(t, call.RequestID(), result.RequestID())
progress, _ := result.Details()[wampproto.OptionReceiveProgress].(bool)
require.False(t, progress)
}
2 changes: 1 addition & 1 deletion idgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
const maxID int64 = 1 << 53

func init() {
source := rand.NewSource(uint64(time.Now().UnixNano()))
source := rand.NewSource(uint64(time.Now().UnixNano())) // #nosec
rand.New(source)
}

Expand Down
5 changes: 4 additions & 1 deletion joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ var ClientRoles = map[string]any{ //nolint:gochecknoglobals
"features": map[string]any{},
},
"callee": map[string]any{
"features": map[string]any{},
"features": map[string]any{
"progressive_call_results": true,
"call_canceling": true,
},
},
"publisher": map[string]any{
"features": map[string]any{},
Expand Down
4 changes: 2 additions & 2 deletions messages/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func AsInt64(i interface{}) (int64, bool) {
case int64:
return v, true
case uint64:
return int64(v), true
return int64(v), true // #nosec
case uint8:
return int64(v), true
case int:
Expand All @@ -302,7 +302,7 @@ func AsInt64(i interface{}) (int64, bool) {
case int32:
return int64(v), true
case uint:
return int64(v), true
return int64(v), true // #nosec
case uint16:
return int64(v), true
case uint32:
Expand Down
8 changes: 6 additions & 2 deletions messages/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,12 @@ func TestValidateMessage(t *testing.T) {
wampMsg := []any{1, "io.xconn.test", map[string]any{}, "invalidType", "extra"}
_, err := messages.ValidateMessage(wampMsg, spec)

require.EqualError(t, err, `item at index 3 must be of type []any but was string
item at index 4 must be of type map[string]any but was string`)
require.Contains(t, []string{
`item at index 3 must be of type []any but was string
item at index 4 must be of type map[string]any but was string`,
`item at index 4 must be of type map[string]any but was string
item at index 3 must be of type []any but was string`,
}, err.Error())
})
}

Expand Down
12 changes: 10 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func (w *Session) SendMessage(msg messages.Message) ([]byte, error) {
return data, nil
case messages.MessageTypeYield:
yield := msg.(*messages.Yield)
w.invocationRequests.Delete(yield.RequestID())
progress, _ := yield.Options()[OptionProgress].(bool)
if !progress {
w.invocationRequests.Delete(yield.RequestID())
}

return data, nil
case messages.MessageTypeRegister:
Expand Down Expand Up @@ -125,11 +128,16 @@ func (w *Session) ReceiveMessage(msg messages.Message) (messages.Message, error)
switch msg.Type() {
case messages.MessageTypeResult:
result := msg.(*messages.Result)
_, exists := w.callRequests.LoadAndDelete(result.RequestID())
_, exists := w.callRequests.Load(result.RequestID())
if !exists {
return nil, fmt.Errorf("received RESULT for invalid requestID")
}

progress, _ := result.Details()[OptionProgress].(bool)
if !progress {
w.callRequests.Delete(result.RequestID())
}

return result, nil
case messages.MessageTypeRegistered:
registered := msg.(*messages.Registered)
Expand Down

0 comments on commit fd8f83f

Please sign in to comment.