Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPPL-536] Workflow update not picked up by Syncer #1029

Merged
merged 6 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion pkg/loop/internal/core/services/capability/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,24 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge
}
responseCh, err := t.impl.RegisterTrigger(server.Context(), req)
if err != nil {
return fmt.Errorf("error registering trigger: %w", err)
// the first message sent to the client will be an ack or error message, this is done in order to syncronize the client and server and avoid
// errors to unregister not found triggers. If the error is not nil, we send an error message to the client and return the error
msg := &capabilitiespb.TriggerResponseMessage{
Message: &capabilitiespb.TriggerResponseMessage_Response{
Response: &capabilitiespb.TriggerResponse{
Error: err.Error(),
},
},
}
return server.Send(msg)
}

// Send ACK response to client
msg := &capabilitiespb.TriggerResponseMessage{
Message: &capabilitiespb.TriggerResponseMessage_Ack{},
}
if err = server.Send(msg); err != nil {
return fmt.Errorf("failed sending ACK response for trigger registration %s: %w", request, err)
}

defer func() {
Expand Down Expand Up @@ -270,6 +287,17 @@ func (t *triggerExecutableClient) RegisterTrigger(ctx context.Context, req capab
return nil, fmt.Errorf("error registering trigger: %w", err)
}

// In order to ensure the registration is successful, we need to wait for the first message from the server.
// This will be an ack or error message. If the error is not nil, we return an error.
ackMsg, err := responseStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive registering trigger ack message: %w", err)
}

if ackMsg.GetAck() == nil {
return nil, errors.New(fmt.Sprintf("failed registering trigger: %s", ackMsg.GetResponse().GetError()))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agparadiso This code is a bit ambiguous I think: what happens if the server sends a response without an ack? We should clearly error in that case as that would mean the protocol has been violated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to send: TriggerResponseMessage_Ack and fail if ackMsg.GetAck() == nil much cleaner, thanks🙇🏼


return forwardTriggerResponsesToChannel(ctx, t.Logger, req, responseStream.Recv)
}

Expand Down
35 changes: 31 additions & 4 deletions pkg/loop/internal/core/services/capability/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (

type mockTrigger struct {
capabilities.BaseCapability
callback chan capabilities.TriggerResponse
triggerActive bool
unregisterCalls chan bool
registerCalls chan bool
callback chan capabilities.TriggerResponse
triggerActive bool
unregisterCalls chan bool
registerCalls chan bool
failedToRegisterErr *string

mu sync.Mutex
}
Expand All @@ -39,6 +40,10 @@ func (m *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities.
return nil, errors.New("already registered")
}

if m.failedToRegisterErr != nil {
return nil, errors.New(*m.failedToRegisterErr)
}

m.triggerActive = true

m.registerCalls <- true
Expand Down Expand Up @@ -170,6 +175,28 @@ func newCapabilityPlugin(t *testing.T, capability capabilities.BaseCapability) (
return regClient.(capabilities.BaseCapability), client, server, nil
}

func Test_RegisterTrigger(t *testing.T) {
testContext := tests.Context(t)
t.Run("async RegisterTrigger implementation returns error to server", func(t *testing.T) {
ctx, cancel := context.WithCancel(testContext)
defer cancel()

errMsg := "boom"
mtr := mustMockTrigger(t)
mtr.failedToRegisterErr = &errMsg

tr, _, _, err := newCapabilityPlugin(t, mtr)
require.NoError(t, err)

ctr := tr.(capabilities.TriggerCapability)

_, err = ctr.RegisterTrigger(
ctx,
capabilities.TriggerRegistrationRequest{})
require.ErrorContains(t, err, fmt.Sprintf("failed registering trigger: %s", errMsg))
})
}

func Test_Capabilities(t *testing.T) {
testContext := tests.Context(t)

Expand Down
Loading