diff --git a/pkg/server/ingestion_worker.go b/pkg/server/ingestion_worker.go index ff2390f..a37008a 100644 --- a/pkg/server/ingestion_worker.go +++ b/pkg/server/ingestion_worker.go @@ -59,13 +59,11 @@ func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) { } }() - go func() { - defer iw.logger.Info("stopped consuming events from ingestion queue") + defer iw.logger.Info("stopped consuming events from ingestion queue") - for event := range eventChan { - iw.processEvent(event) - } - }() + for event := range eventChan { + iw.processEvent(event) + } } func (iw *IngestionWorker) processEvent(event *tarianpb.Event) {