Skip to content

Commit

Permalink
Fix ingestion_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
andylibrian committed May 2, 2024
1 parent 7345382 commit 9441acb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 37 deletions.
9 changes: 4 additions & 5 deletions pkg/protoqueue/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package protoqueue
import (
"errors"
"fmt"
"log"
"time"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -189,14 +188,14 @@ func (j *JetStream) publishWithRetry(subject string, data []byte) error {
RetryInterval := 5 * time.Second
var err error
for i := 0; i < maxRetries; i++ {
// Publish message
_, err = j.Conn.JSContext.Publish(subject, data)
if err == nil {
// Message published successfully
return nil
}
log.Printf("Publish attempt %d failed: %v", i+1, err)
// Wait before retrying

j.logger.Warn("Publish attempt failed")
j.logger.WithError(err).Warnf("Publish attempt %d failed", i+1)

time.Sleep(RetryInterval)
}
return err
Expand Down
39 changes: 7 additions & 32 deletions pkg/server/ingestion_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type IngestionWorker struct {
eventStore store.EventStore
IngestionQueue protoqueue.QueueSubscriber
ingestionQueueForEventDetection protoqueue.QueueSubscriber
IngestionQueueForEventDetection protoqueue.QueueSubscriber
logger *logrus.Logger
}

Expand All @@ -30,7 +30,7 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu
return &IngestionWorker{
eventStore: eventStore,
IngestionQueue: queueSubscriber,
ingestionQueueForEventDetection: queueSubscriberForEventDetection,
IngestionQueueForEventDetection: queueSubscriberForEventDetection,
logger: logger,
}
}
Expand All @@ -39,7 +39,7 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu
// It uses a goroutine and a buffered channel to read events from the queue in the background.
func (iw *IngestionWorker) Start() {
go iw.loopConsumeQueue(iw.IngestionQueue)
go iw.loopConsumeQueueEventDetection(iw.ingestionQueueForEventDetection)
go iw.loopConsumeQueue(iw.IngestionQueueForEventDetection)
}

func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) {
Expand All @@ -59,13 +59,11 @@ func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) {
}
}()

go func() {
defer iw.logger.Info("stopped consuming events from ingestion queue")
defer iw.logger.Info("stopped consuming events from ingestion queue")

for event := range eventChan {
iw.processEvent(event)
}
}()
for event := range eventChan {
iw.processEvent(event)
}
}

func (iw *IngestionWorker) processEvent(event *tarianpb.Event) {
Expand All @@ -78,26 +76,3 @@ func (iw *IngestionWorker) processEvent(event *tarianpb.Event) {
iw.logger.WithError(err).Error("error while processing event")
}
}

func (iw *IngestionWorker) loopConsumeQueueEventDetection(queue protoqueue.QueueSubscriber) {
for {
msg, err := queue.NextMessage(&tarianpb.Event{})
if err != nil {
// iw.logger.WithError(err).Error("error while processing event")
continue
}

event, ok := msg.(*tarianpb.Event)
if !ok {
// iw.logger.WithError(err).Error("error while processing event")
continue
}

event.ServerTimestamp = timestamppb.Now()
_ = iw.eventStore.Add(event)

// if err != nil {
// iw.logger.WithError(err).Error("error while processing event")
// }
}
}

0 comments on commit 9441acb

Please sign in to comment.