Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Malka <[email protected]>
  • Loading branch information
amirmalka committed Nov 20, 2024
1 parent e6925c3 commit a800c18
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions tests/synchronizer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func createK8sCluster(t *testing.T, cluster, account string) *TestKubernetesClus
}

func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort string) (testcontainers.Container, string, string) {
pulsarC, _ := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
pulsarC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "apachepulsar/pulsar:2.11.0",
Image: "apachepulsar/pulsar:3.0.3",
Cmd: []string{"bin/pulsar", "standalone"},
ExposedPorts: []string{brokerPort + ":6650/tcp", adminPort + ":8080/tcp"},
WaitingFor: wait.ForAll(
Expand All @@ -454,6 +454,7 @@ func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort strin
},
Started: true,
})
require.NoError(t, err)
require.NotNil(t, pulsarC)
pulsarUrl, err := pulsarC.PortEndpoint(ctx, "6650", "pulsar")
require.NoError(t, err)
Expand Down Expand Up @@ -652,18 +653,24 @@ func initIntegrationTest(t *testing.T) *Test {
require.NoError(t, err)
s3 := s3connector.NewS3Mock()
ingesterProcessor := resourceprocessor.NewKubernetesResourceProcessor(&s3, postgresconnectordal.NewPostgresDAL(ingesterPgClient))
ctx = context.WithValue(ctx, "resourceProcessor", ingesterProcessor)
onFinishProducer, err := pulsarClient.NewProducer(pulsarconnector.WithProducerTopic("onFinishTopic"), pulsarconnector.WithProducerNamespace("armo", "kubescape"))
require.NoError(t, err)
gvrProducerMap, err := synchronizeringester.InitGVRProducerMap(pulsarClient, ingesterConf.SynchronizerIngesterConfig)
require.NoError(t, err)
isReady := make(chan bool)

go synchronizeringester.ConsumeSynchronizerMessages(
gvrProducerMap,
ingesters.WithPulsarClient(pulsarClient),
ingesters.WithIngesterConfig(ingesterConf.SynchronizerIngesterConfig),
ingesters.WithPGConnector(ingesterPgClient),
ingesters.WithContext(ctx),
ingesters.WithSynchronizerProducer(ingesterProducer),
ingesters.WithOnFinishProducer(onFinishProducer))
ingesters.WithOnFinishProducer(onFinishProducer),
ingesters.WithIsReadyChannel(isReady),
)
<-isReady

// fake websocket
clientConn1, serverConn1 := net.Pipe() // client1 -> server1
Expand Down

0 comments on commit a800c18

Please sign in to comment.