Skip to content

Commit

Permalink
added retry logic to publish the event
Browse files Browse the repository at this point in the history
  • Loading branch information
kumari-anupam authored and andylibrian committed May 2, 2024
1 parent fb000c8 commit 631249e
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion pkg/protoqueue/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package protoqueue
import (
"errors"
"fmt"
"log"
"time"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -175,14 +176,32 @@ func (j *JetStream) Publish(queuedMessage proto.Message) error {
return fmt.Errorf("nats: jetstream publish: failed to marshal queued message: %w", err)
}

_, err = j.Conn.JSContext.Publish(j.StreamName, data)
err = j.publishWithRetry(j.StreamName, data)
if err != nil {
return fmt.Errorf("nats: jetstream publish: failed to publish message: %w", err)
}

return nil
}

func (j *JetStream) publishWithRetry(subject string, data []byte) error {
maxRetries := 5
RetryInterval := 5 * time.Second
var err error
for i := 0; i < maxRetries; i++ {
// Publish message
_, err = j.Conn.JSContext.Publish(subject, data)
if err == nil {
// Message published successfully
return nil
}
log.Printf("Publish attempt %d failed: %v", i+1, err)
// Wait before retrying
time.Sleep(RetryInterval)
}
return err
}

// NextMessage retrieves the next message from the JetStream queue and unmarshals it into the provided protobuf message.
//
// Parameters:
Expand Down

0 comments on commit 631249e

Please sign in to comment.