From 5100fed0cbe684dad59359818245b223d12620fe Mon Sep 17 00:00:00 2001 From: Amir Malka Date: Sun, 4 Feb 2024 16:24:02 +0200 Subject: [PATCH] Start consuming pulsar messages after reader is created (#43) * start consuming pulsar message after reader is created Signed-off-by: Amir Malka * fix Signed-off-by: Amir Malka --------- Signed-off-by: Amir Malka --- adapters/backend/v1/adapter.go | 8 +------- adapters/backend/v1/pulsar.go | 16 +++++++++++----- cmd/server/main.go | 5 +++-- core/synchronizer_integration_test.go | 5 +++-- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/adapters/backend/v1/adapter.go b/adapters/backend/v1/adapter.go index 5264e69..5061e59 100644 --- a/adapters/backend/v1/adapter.go +++ b/adapters/backend/v1/adapter.go @@ -21,15 +21,13 @@ type Adapter struct { connMapMutex sync.RWMutex connectionMap map[string]domain.ClientIdentifier // -> producer messaging.MessageProducer - reader messaging.MessageReader once sync.Once mainContext context.Context } -func NewBackendAdapter(mainContext context.Context, messageProducer messaging.MessageProducer, messageReader messaging.MessageReader) *Adapter { +func NewBackendAdapter(mainContext context.Context, messageProducer messaging.MessageProducer) *Adapter { adapter := &Adapter{ producer: messageProducer, - reader: messageReader, mainContext: mainContext, connectionMap: make(map[string]domain.ClientIdentifier), } @@ -92,10 +90,6 @@ func (b *Adapter) RegisterCallbacks(ctx context.Context, callbacks domain.Callba } func (b *Adapter) Start(ctx context.Context) error { - b.once.Do(func() { - b.reader.Start(b.mainContext, b) - }) - b.connMapMutex.Lock() defer b.connMapMutex.Unlock() incomingId := utils.ClientIdentifierFromContext(ctx) diff --git a/adapters/backend/v1/pulsar.go b/adapters/backend/v1/pulsar.go index e98aae1..095a39f 100644 --- a/adapters/backend/v1/pulsar.go +++ b/adapters/backend/v1/pulsar.go @@ -74,10 +74,17 @@ func (c *PulsarMessageReader) Start(mainCtx context.Context, adapter adapters.Ad }() go func() { logger.L().Info("starting to listening on pulsar message channel") - _ = c.listenOnMessageChannel(mainCtx, adapter) + c.listenOnMessageChannel(mainCtx, adapter) }() } +func (c *PulsarMessageReader) stop() { + logger.L().Info("closing pulsar reader") + c.reader.Close() + logger.L().Info("closing pulsar message channel") + close(c.messageChannel) +} + func (c *PulsarMessageReader) readerLoop(ctx context.Context) { for { msg, err := c.reader.Next(ctx) @@ -94,13 +101,12 @@ func (c *PulsarMessageReader) readerLoop(ctx context.Context) { } } -func (c *PulsarMessageReader) listenOnMessageChannel(ctx context.Context, adapter adapters.Adapter) error { - defer c.reader.Close() +func (c *PulsarMessageReader) listenOnMessageChannel(ctx context.Context, adapter adapters.Adapter) { + defer c.stop() for { select { case <-ctx.Done(): - close(c.messageChannel) - return nil + return case msg := <-c.messageChannel: msgID := utils.PulsarMessageIDtoString(msg.ID()) if err := c.handleSingleSynchronizerMessage(ctx, adapter, msg); err != nil { diff --git a/cmd/server/main.go b/cmd/server/main.go index d52cfb0..24aaf43 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -64,7 +64,8 @@ func main() { logger.L().Fatal("failed to create pulsar reader", helpers.Error(err), helpers.String("config", fmt.Sprintf("%+v", cfg.Backend.PulsarConfig))) } - adapter = backend.NewBackendAdapter(ctx, pulsarProducer, pulsarReader) + adapter = backend.NewBackendAdapter(ctx, pulsarProducer) + pulsarReader.Start(ctx, adapter) } else { // mock adapter logger.L().Info("initializing mock adapter") @@ -102,7 +103,7 @@ func main() { id := utils.ClientIdentifierFromContext(r.Context()) synchronizer, err := core.NewSynchronizerServer(r.Context(), adapter, conn) if err != nil { - logger.L().Error("error during sync, closing listener", + logger.L().Error("error during creating synchronizer server instance", helpers.String("account", id.Account), helpers.String("cluster", id.Cluster), helpers.String("connectionId", id.ConnectionId), diff --git a/core/synchronizer_integration_test.go b/core/synchronizer_integration_test.go index 7fea3e3..c929b37 100644 --- a/core/synchronizer_integration_test.go +++ b/core/synchronizer_integration_test.go @@ -527,7 +527,8 @@ func createAndStartSynchronizerServer(t *testing.T, pulsarUrl, pulsarAdminUrl st require.NoError(t, err) ctx, cancel := context.WithCancel(cluster.ctx) - serverAdapter := backend.NewBackendAdapter(ctx, pulsarProducer, pulsarReader) + serverAdapter := backend.NewBackendAdapter(ctx, pulsarProducer) + pulsarReader.Start(ctx, serverAdapter) synchronizerServer, err := NewSynchronizerServer(ctx, serverAdapter, serverConn) require.NoError(t, err) @@ -986,7 +987,7 @@ func TestSynchronizer_TC08(t *testing.T) { // add applicationprofile to k8s _, err = td.clusters[0].storageclient.ApplicationProfiles(namespace).Create(context.TODO(), td.clusters[0].applicationprofile, metav1.CreateOptions{}) require.NoError(t, err) - time.Sleep(5 * time.Second) + time.Sleep(15 * time.Second) // check object in postgres _, objFound, err := td.processor.GetObjectFromPostgres(td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name) assert.NoError(t, err)