From 3ba24f4be5c3f5e8ec645d435735a7ebe4734a29 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Fri, 14 Feb 2025 11:45:24 +0100 Subject: [PATCH 1/4] fix: syncronize RegisterTrigger client and server using a first ack/err message --- .../core/services/capability/capabilities.go | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pkg/loop/internal/core/services/capability/capabilities.go b/pkg/loop/internal/core/services/capability/capabilities.go index f1e55c576..e6b9778e9 100644 --- a/pkg/loop/internal/core/services/capability/capabilities.go +++ b/pkg/loop/internal/core/services/capability/capabilities.go @@ -215,9 +215,31 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge } responseCh, err := t.impl.RegisterTrigger(server.Context(), req) if err != nil { + // the first message sent to the client will be an ack or error message, this is done in order to syncronize the client and server and avoid + // errors to unregister not found triggers. If the error is not nil, we send an error message to the client and return the error + msg := &capabilitiespb.TriggerResponseMessage{ + Message: &capabilitiespb.TriggerResponseMessage_Response{ + Response: &capabilitiespb.TriggerResponse{ + Error: err.Error(), + }, + }, + } + if err = server.Send(msg); err != nil { + return fmt.Errorf("failed sending error response for trigger registration %s: %w", request, err) + } return fmt.Errorf("error registering trigger: %w", err) } + // Send ACK response to client + msg := &capabilitiespb.TriggerResponseMessage{ + Message: &capabilitiespb.TriggerResponseMessage_Response{ + Response: &capabilitiespb.TriggerResponse{}, + }, + } + if err = server.Send(msg); err != nil { + return fmt.Errorf("failed sending ACK response for trigger registration %s: %w", request, err) + } + defer func() { // Always attempt to unregister the trigger to ensure any related resources are cleaned up err = t.impl.UnregisterTrigger(server.Context(), req) @@ -270,6 +292,17 @@ func (t *triggerExecutableClient) RegisterTrigger(ctx context.Context, req capab return nil, fmt.Errorf("error registering trigger: %w", err) } + // In order to ensure the registration is successful, we need to wait for the first message from the server. + // This will be an ack or error message. If the error is not nil, we return an error. + ackMsg, err := responseStream.Recv() + if err != nil { + return nil, fmt.Errorf("failed to receive registering trigger ack message: %w", err) + } + + if ackMsg.GetResponse().GetError() != "" { + return nil, errors.New(fmt.Sprintf("failed registering trigger: %s", ackMsg.GetResponse().GetError())) + } + return forwardTriggerResponsesToChannel(ctx, t.Logger, req, responseStream.Recv) } From 46388e4a79ef1c678dacfd4ad5f9b878a53aa77b Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Tue, 18 Feb 2025 16:07:46 +0100 Subject: [PATCH 2/4] test: add tests covering the error on syncronization scenario --- .../services/capability/capabilities_test.go | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/pkg/loop/internal/core/services/capability/capabilities_test.go b/pkg/loop/internal/core/services/capability/capabilities_test.go index 509e04ac4..6c1bd3c75 100644 --- a/pkg/loop/internal/core/services/capability/capabilities_test.go +++ b/pkg/loop/internal/core/services/capability/capabilities_test.go @@ -23,10 +23,11 @@ import ( type mockTrigger struct { capabilities.BaseCapability - callback chan capabilities.TriggerResponse - triggerActive bool - unregisterCalls chan bool - registerCalls chan bool + callback chan capabilities.TriggerResponse + triggerActive bool + unregisterCalls chan bool + registerCalls chan bool + failedToRegisterErr *string mu sync.Mutex } @@ -39,6 +40,10 @@ func (m *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities. return nil, errors.New("already registered") } + if m.failedToRegisterErr != nil { + return nil, errors.New(*m.failedToRegisterErr) + } + m.triggerActive = true m.registerCalls <- true @@ -170,6 +175,28 @@ func newCapabilityPlugin(t *testing.T, capability capabilities.BaseCapability) ( return regClient.(capabilities.BaseCapability), client, server, nil } +func Test_RegisterTrigger(t *testing.T) { + testContext := tests.Context(t) + t.Run("async RegisterTrigger implementation returns error to server", func(t *testing.T) { + ctx, cancel := context.WithCancel(testContext) + defer cancel() + + errMsg := "boom" + mtr := mustMockTrigger(t) + mtr.failedToRegisterErr = &errMsg + + tr, _, _, err := newCapabilityPlugin(t, mtr) + require.NoError(t, err) + + ctr := tr.(capabilities.TriggerCapability) + + _, err = ctr.RegisterTrigger( + ctx, + capabilities.TriggerRegistrationRequest{}) + require.ErrorContains(t, err, fmt.Sprintf("failed registering trigger: %s", errMsg)) + }) +} + func Test_Capabilities(t *testing.T) { testContext := tests.Context(t) From 17473fd30560190f54c676f4bb7983b2bb831b10 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Wed, 19 Feb 2025 12:33:48 +0100 Subject: [PATCH 3/4] fix: use TriggerResponseMessage_Ack instead of an empty msg --- .../internal/core/services/capability/capabilities.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/loop/internal/core/services/capability/capabilities.go b/pkg/loop/internal/core/services/capability/capabilities.go index e6b9778e9..fa1cd3d55 100644 --- a/pkg/loop/internal/core/services/capability/capabilities.go +++ b/pkg/loop/internal/core/services/capability/capabilities.go @@ -227,14 +227,12 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge if err = server.Send(msg); err != nil { return fmt.Errorf("failed sending error response for trigger registration %s: %w", request, err) } - return fmt.Errorf("error registering trigger: %w", err) + return nil } // Send ACK response to client msg := &capabilitiespb.TriggerResponseMessage{ - Message: &capabilitiespb.TriggerResponseMessage_Response{ - Response: &capabilitiespb.TriggerResponse{}, - }, + Message: &capabilitiespb.TriggerResponseMessage_Ack{}, } if err = server.Send(msg); err != nil { return fmt.Errorf("failed sending ACK response for trigger registration %s: %w", request, err) @@ -299,7 +297,7 @@ func (t *triggerExecutableClient) RegisterTrigger(ctx context.Context, req capab return nil, fmt.Errorf("failed to receive registering trigger ack message: %w", err) } - if ackMsg.GetResponse().GetError() != "" { + if ackMsg.GetAck() == nil { return nil, errors.New(fmt.Sprintf("failed registering trigger: %s", ackMsg.GetResponse().GetError())) } From 10352b5b0166d4f9c00d1a5746e538729b12a14e Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Wed, 19 Feb 2025 15:14:32 +0100 Subject: [PATCH 4/4] chore: return server.Send(msg) --- pkg/loop/internal/core/services/capability/capabilities.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/loop/internal/core/services/capability/capabilities.go b/pkg/loop/internal/core/services/capability/capabilities.go index fa1cd3d55..43925f666 100644 --- a/pkg/loop/internal/core/services/capability/capabilities.go +++ b/pkg/loop/internal/core/services/capability/capabilities.go @@ -224,10 +224,7 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge }, }, } - if err = server.Send(msg); err != nil { - return fmt.Errorf("failed sending error response for trigger registration %s: %w", request, err) - } - return nil + return server.Send(msg) } // Send ACK response to client