Skip to content

Commit

Permalink
continued: handle extended deadline for async sending goroutine better
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
Pascal-Delange committed Sep 2, 2024
1 parent 2fb6273 commit 5600947
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
19 changes: 15 additions & 4 deletions repositories/convoy_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ func NewConvoyRepository(convoyClientProvider ConvoyClientProvider, limit int) C
convoyClientProvider = noOpConvoyClientProvider{}
}

instanceLimit := rate.Limit(limit) / 2 // other instances may be running in parallel. If we want to do more precise, we'll need a distributed limiter
// other instances may be running in parallel. If we want to do it more precisely, we'll need a distributed limiter
instanceLimit := rate.Limit(limit) / 2
burst := limit / 3
if burst == 0 {
burst = 1
}
return ConvoyRepository{
convoyClientProvider: convoyClientProvider,
limiter: rate.NewLimiter(instanceLimit, burst),
Expand Down Expand Up @@ -73,8 +77,15 @@ func getName(ownerId string, eventTypes []string) string {
func (repo ConvoyRepository) SendWebhookEvent(ctx context.Context, webhookEvent models.WebhookEvent) error {
err := repo.limiter.Wait(ctx)
if err != nil {
// happens if the context is canceled or the wait time is expected to be larger than the context deadline
return errors.Wrap(err, "can't send convoy event: rate limit exceeded")
// Happens if the context is canceled or the wait time is expected to be larger than the context deadline.
// The former should not happen in the current implementation, but the latter could happen if the function is called
// rapidly and if the context passed has a too short deadline.
logger := utils.LoggerFromContext(ctx)
logger.WarnContext(ctx,
"Internal rate limit exceeded, the webhook sending will be retried asynchronously",
"webhookEventId", webhookEvent.Id,
)
return nil
}

projectId := repo.convoyClientProvider.GetProjectID()
Expand All @@ -98,7 +109,7 @@ func (repo ConvoyRepository) SendWebhookEvent(ctx context.Context, webhookEvent
if fanoutEvent.StatusCode() == 429 {
logger := utils.LoggerFromContext(ctx)
logger.WarnContext(ctx,
"rate limit exceeded, the webhook sending will be retried asynchronously",
"Convoy rate limit exceeded, the webhook sending will be retried asynchronously",
"ownerId", ownerId, "webhookEventId", webhookEvent.Id,
)
return nil
Expand Down
5 changes: 3 additions & 2 deletions usecases/webhook_events_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
MAX_CONCURRENT_WEBHOOKS_SENT = 20
WEBHOOKS_SEND_MAX_RETRIES = 24
DEFAULT_FAILED_WEBHOOKS_PAGE_SIZE = 1000
ASYNC_WEBHOOKS_SEND_TIMEOUT = 5 * time.Minute
)

type convoyWebhookEventRepository interface {
Expand Down Expand Up @@ -112,9 +113,9 @@ func (usecase WebhookEventsUsecase) SendWebhookEventAsync(ctx context.Context, w
logger := utils.LoggerFromContext(ctx).With("webhook_event_id", webhookEventId)
ctx = utils.StoreLoggerInContext(ctx, logger)

// go routine to send the webhook event asynchronously, with a new context and timeout and a child span
// goroutine to send the webhook event asynchronously, with a new context and timeout and a child span
go func() {
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*3)
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), ASYNC_WEBHOOKS_SEND_TIMEOUT)
defer cancel()
tracer := utils.OpenTelemetryTracerFromContext(ctx)
ctx, span := tracer.Start(
Expand Down

0 comments on commit 5600947

Please sign in to comment.