diff --git a/tests/synchronizer_integration_test.go b/tests/synchronizer_integration_test.go index 7c424d0..3e4fe9b 100644 --- a/tests/synchronizer_integration_test.go +++ b/tests/synchronizer_integration_test.go @@ -438,9 +438,9 @@ func createK8sCluster(t *testing.T, cluster, account string) *TestKubernetesClus } func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort string) (testcontainers.Container, string, string) { - pulsarC, _ := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + pulsarC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ - Image: "apachepulsar/pulsar:2.11.0", + Image: "apachepulsar/pulsar:3.0.3", Cmd: []string{"bin/pulsar", "standalone"}, ExposedPorts: []string{brokerPort + ":6650/tcp", adminPort + ":8080/tcp"}, WaitingFor: wait.ForAll( @@ -454,6 +454,7 @@ func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort strin }, Started: true, }) + require.NoError(t, err) require.NotNil(t, pulsarC) pulsarUrl, err := pulsarC.PortEndpoint(ctx, "6650", "pulsar") require.NoError(t, err) @@ -652,10 +653,13 @@ func initIntegrationTest(t *testing.T) *Test { require.NoError(t, err) s3 := s3connector.NewS3Mock() ingesterProcessor := resourceprocessor.NewKubernetesResourceProcessor(&s3, postgresconnectordal.NewPostgresDAL(ingesterPgClient)) + ctx = context.WithValue(ctx, "resourceProcessor", ingesterProcessor) onFinishProducer, err := pulsarClient.NewProducer(pulsarconnector.WithProducerTopic("onFinishTopic"), pulsarconnector.WithProducerNamespace("armo", "kubescape")) require.NoError(t, err) gvrProducerMap, err := synchronizeringester.InitGVRProducerMap(pulsarClient, ingesterConf.SynchronizerIngesterConfig) require.NoError(t, err) + isReady := make(chan bool) + go synchronizeringester.ConsumeSynchronizerMessages( gvrProducerMap, ingesters.WithPulsarClient(pulsarClient), @@ -663,7 +667,10 @@ func initIntegrationTest(t *testing.T) *Test { ingesters.WithPGConnector(ingesterPgClient), ingesters.WithContext(ctx), ingesters.WithSynchronizerProducer(ingesterProducer), - ingesters.WithOnFinishProducer(onFinishProducer)) + ingesters.WithOnFinishProducer(onFinishProducer), + ingesters.WithIsReadyChannel(isReady), + ) + <-isReady // fake websocket clientConn1, serverConn1 := net.Pipe() // client1 -> server1