Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Unify subscription id with client message id #6847

76 changes: 50 additions & 26 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ func (c *Controller) readMessages(ctx context.Context) error {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidMessage, "error reading message", "", "", ""))
wrapErrorMessage(InvalidMessage, "error reading message", ""),
)
continue
}

Expand All @@ -315,7 +316,8 @@ func (c *Controller) readMessages(ctx context.Context) error {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidMessage, "error parsing message", "", "", ""))
wrapErrorMessage(InvalidMessage, "error parsing message", ""),
)
continue
}
}
Expand Down Expand Up @@ -358,24 +360,32 @@ func (c *Controller) handleMessage(ctx context.Context, message json.RawMessage)
}

func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMessageRequest) {
subscriptionID, err := c.parseOrCreateSubscriptionID(msg.SubscriptionID)
if err != nil {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error parsing subscription id", msg.SubscriptionID),
)
return
}

// register new provider
provider, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.multiplexedStream)
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error creating data provider", msg.ClientMessageID, models.SubscribeAction, ""),
wrapErrorMessage(InvalidArgument, "error creating data provider", subscriptionID.String()),
)
return
}
c.dataProviders.Add(provider.ID(), provider)
c.dataProviders.Add(subscriptionID, provider)

// write OK response to client
responseOk := models.SubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
ClientMessageID: msg.ClientMessageID,
Success: true,
SubscriptionID: provider.ID().String(),
SubscriptionID: subscriptionID.String(),
},
}
c.writeResponse(ctx, responseOk)
Expand All @@ -388,44 +398,42 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(SubscriptionError, "subscription finished with error", "", "", ""),
wrapErrorMessage(InternalError, "internal error", subscriptionID.String()),
)
}

c.dataProvidersGroup.Done()
c.dataProviders.Remove(provider.ID())
c.dataProviders.Remove(subscriptionID)
}()
}

func (c *Controller) handleUnsubscribe(ctx context.Context, msg models.UnsubscribeMessageRequest) {
id, err := uuid.Parse(msg.SubscriptionID)
subscriptionID, err := uuid.Parse(msg.SubscriptionID)
if err != nil {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error parsing subscription ID", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID),
wrapErrorMessage(InvalidArgument, "error parsing subscription id", msg.SubscriptionID),
)
return
}

provider, ok := c.dataProviders.Get(id)
provider, ok := c.dataProviders.Get(subscriptionID)
if !ok {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(NotFound, "subscription not found", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID),
wrapErrorMessage(NotFound, "subscription not found", subscriptionID.String()),
)
return
}

provider.Close()
c.dataProviders.Remove(id)
c.dataProviders.Remove(subscriptionID)

responseOk := models.UnsubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
ClientMessageID: msg.ClientMessageID,
Success: true,
SubscriptionID: msg.SubscriptionID,
SubscriptionID: subscriptionID.String(),
},
}
c.writeResponse(ctx, responseOk)
Expand All @@ -445,15 +453,16 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(NotFound, "error listing subscriptions", msg.ClientMessageID, models.ListSubscriptionsAction, ""),
wrapErrorMessage(NotFound, "error listing subscriptions", ""),
)
return
}

responseOk := models.ListSubscriptionsMessageResponse{
Success: true,
ClientMessageID: msg.ClientMessageID,
Subscriptions: subs,
BaseMessageResponse: models.BaseMessageResponse{
SubscriptionID: msg.SubscriptionID,
},
Subscriptions: subs,
}
c.writeResponse(ctx, responseOk)
}
Expand Down Expand Up @@ -490,15 +499,30 @@ func (c *Controller) writeResponse(ctx context.Context, response interface{}) {
}
}

func wrapErrorMessage(code Code, message string, msgId string, action string, subscriptionID string) models.BaseMessageResponse {
func wrapErrorMessage(code Code, message string, subscriptionID string) models.BaseMessageResponse {
return models.BaseMessageResponse{
ClientMessageID: msgId,
Success: false,
SubscriptionID: subscriptionID,
SubscriptionID: subscriptionID,
Error: models.ErrorMessage{
Code: int(code),
Message: message,
Action: action,
},
}
}

func (c *Controller) parseOrCreateSubscriptionID(id string) (uuid.UUID, error) {
// if client didn't provide subscription id, we create one for him
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
if id == "" {
return uuid.New(), nil
}

newID, err := uuid.Parse(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jribbink what do you think about requiring clients pass UUIDs (32 character hex strings) for the client ids? Is that reasonable or too restrictive?

Copy link
Contributor Author

@illia-malachyn illia-malachyn Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, if we require clients to create subscripion_id on their side, we can go with a simple integer value for subscription_id as a client will have control over random number generation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we decided not to require clients to provide valid uuid but rather a random string containing N characters

if err != nil {
return uuid.Nil, err
}

if c.dataProviders.Has(newID) {
return uuid.Nil, fmt.Errorf("subscription id is already in use")
}

return newID, nil
}
Loading
Loading