Skip to content

Commit

Permalink
fix: syncronize RegisterTrigger client and server using a first ack/e…
Browse files Browse the repository at this point in the history
…rr message
  • Loading branch information
agparadiso committed Feb 14, 2025
1 parent d2aaa39 commit 1e81679
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pkg/loop/internal/core/services/capability/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,31 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge
}
responseCh, err := t.impl.RegisterTrigger(server.Context(), req)
if err != nil {
// the first message sent to the client will be an ack or error message, this is done in order to syncronize the client and server and avoid
// errors to unregister not found triggers. If the error is not nil, we send an error message to the client and return the error
msg := &capabilitiespb.TriggerResponseMessage{
Message: &capabilitiespb.TriggerResponseMessage_Response{
Response: &capabilitiespb.TriggerResponse{
Error: err.Error(),
},
},
}
if err = server.Send(msg); err != nil {
return fmt.Errorf("failed sending error response for trigger registration %s: %w", request, err)
}
return fmt.Errorf("error registering trigger: %w", err)
}

// Send ACK response to client
msg := &capabilitiespb.TriggerResponseMessage{
Message: &capabilitiespb.TriggerResponseMessage_Response{
Response: &capabilitiespb.TriggerResponse{},
},
}
if err = server.Send(msg); err != nil {
return fmt.Errorf("failed sending ACK response for trigger registration %s: %w", request, err)
}

defer func() {
// Always attempt to unregister the trigger to ensure any related resources are cleaned up
err = t.impl.UnregisterTrigger(server.Context(), req)
Expand Down Expand Up @@ -270,6 +292,17 @@ func (t *triggerExecutableClient) RegisterTrigger(ctx context.Context, req capab
return nil, fmt.Errorf("error registering trigger: %w", err)
}

// In order to ensure the registration is successful, we need to wait for the first message from the server.
// This will be an ack or error message. If the error is not nil, we return an error.
ackMsg, err := responseStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive registering trigger ack message: %w", err)
}

if ackMsg.GetResponse().GetError() != "" {
return nil, errors.New(fmt.Sprintf("failed registering trigger: %s", ackMsg.GetResponse().GetError()))
}

return forwardTriggerResponsesToChannel(ctx, t.Logger, req, responseStream.Recv)
}

Expand Down

0 comments on commit 1e81679

Please sign in to comment.