Skip to content

Commit

Permalink
Start consuming pulsar messages after reader is created (#43)
Browse files Browse the repository at this point in the history
* start consuming pulsar message after reader is created

Signed-off-by: Amir Malka <[email protected]>

* fix

Signed-off-by: Amir Malka <[email protected]>

---------

Signed-off-by: Amir Malka <[email protected]>
  • Loading branch information
amirmalka authored Feb 4, 2024
1 parent 0d20518 commit 5100fed
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
8 changes: 1 addition & 7 deletions adapters/backend/v1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ type Adapter struct {
connMapMutex sync.RWMutex
connectionMap map[string]domain.ClientIdentifier // <cluster, account> -> <connection string>
producer messaging.MessageProducer
reader messaging.MessageReader
once sync.Once
mainContext context.Context
}

func NewBackendAdapter(mainContext context.Context, messageProducer messaging.MessageProducer, messageReader messaging.MessageReader) *Adapter {
func NewBackendAdapter(mainContext context.Context, messageProducer messaging.MessageProducer) *Adapter {
adapter := &Adapter{
producer: messageProducer,
reader: messageReader,
mainContext: mainContext,
connectionMap: make(map[string]domain.ClientIdentifier),
}
Expand Down Expand Up @@ -92,10 +90,6 @@ func (b *Adapter) RegisterCallbacks(ctx context.Context, callbacks domain.Callba
}

func (b *Adapter) Start(ctx context.Context) error {
b.once.Do(func() {
b.reader.Start(b.mainContext, b)
})

b.connMapMutex.Lock()
defer b.connMapMutex.Unlock()
incomingId := utils.ClientIdentifierFromContext(ctx)
Expand Down
16 changes: 11 additions & 5 deletions adapters/backend/v1/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,17 @@ func (c *PulsarMessageReader) Start(mainCtx context.Context, adapter adapters.Ad
}()
go func() {
logger.L().Info("starting to listening on pulsar message channel")
_ = c.listenOnMessageChannel(mainCtx, adapter)
c.listenOnMessageChannel(mainCtx, adapter)
}()
}

func (c *PulsarMessageReader) stop() {
logger.L().Info("closing pulsar reader")
c.reader.Close()
logger.L().Info("closing pulsar message channel")
close(c.messageChannel)
}

func (c *PulsarMessageReader) readerLoop(ctx context.Context) {
for {
msg, err := c.reader.Next(ctx)
Expand All @@ -94,13 +101,12 @@ func (c *PulsarMessageReader) readerLoop(ctx context.Context) {
}
}

func (c *PulsarMessageReader) listenOnMessageChannel(ctx context.Context, adapter adapters.Adapter) error {
defer c.reader.Close()
func (c *PulsarMessageReader) listenOnMessageChannel(ctx context.Context, adapter adapters.Adapter) {
defer c.stop()
for {
select {
case <-ctx.Done():
close(c.messageChannel)
return nil
return
case msg := <-c.messageChannel:
msgID := utils.PulsarMessageIDtoString(msg.ID())
if err := c.handleSingleSynchronizerMessage(ctx, adapter, msg); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func main() {
logger.L().Fatal("failed to create pulsar reader", helpers.Error(err), helpers.String("config", fmt.Sprintf("%+v", cfg.Backend.PulsarConfig)))
}

adapter = backend.NewBackendAdapter(ctx, pulsarProducer, pulsarReader)
adapter = backend.NewBackendAdapter(ctx, pulsarProducer)
pulsarReader.Start(ctx, adapter)
} else {
// mock adapter
logger.L().Info("initializing mock adapter")
Expand Down Expand Up @@ -102,7 +103,7 @@ func main() {
id := utils.ClientIdentifierFromContext(r.Context())
synchronizer, err := core.NewSynchronizerServer(r.Context(), adapter, conn)
if err != nil {
logger.L().Error("error during sync, closing listener",
logger.L().Error("error during creating synchronizer server instance",
helpers.String("account", id.Account),
helpers.String("cluster", id.Cluster),
helpers.String("connectionId", id.ConnectionId),
Expand Down
5 changes: 3 additions & 2 deletions core/synchronizer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ func createAndStartSynchronizerServer(t *testing.T, pulsarUrl, pulsarAdminUrl st
require.NoError(t, err)

ctx, cancel := context.WithCancel(cluster.ctx)
serverAdapter := backend.NewBackendAdapter(ctx, pulsarProducer, pulsarReader)
serverAdapter := backend.NewBackendAdapter(ctx, pulsarProducer)
pulsarReader.Start(ctx, serverAdapter)
synchronizerServer, err := NewSynchronizerServer(ctx, serverAdapter, serverConn)
require.NoError(t, err)

Expand Down Expand Up @@ -986,7 +987,7 @@ func TestSynchronizer_TC08(t *testing.T) {
// add applicationprofile to k8s
_, err = td.clusters[0].storageclient.ApplicationProfiles(namespace).Create(context.TODO(), td.clusters[0].applicationprofile, metav1.CreateOptions{})
require.NoError(t, err)
time.Sleep(5 * time.Second)
time.Sleep(15 * time.Second)
// check object in postgres
_, objFound, err := td.processor.GetObjectFromPostgres(td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name)
assert.NoError(t, err)
Expand Down

0 comments on commit 5100fed

Please sign in to comment.