Skip to content

Commit

Permalink
split consumer and producer topic configuration (#50)
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Malka <[email protected]>
  • Loading branch information
amirmalka authored Feb 6, 2024
1 parent 12c9602 commit 7035e5a
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 5 deletions.
4 changes: 2 additions & 2 deletions adapters/backend/v1/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewPulsarMessageReader(cfg config.Config, pulsarClient pulsarconnector.Clie
panic(err)
}
readerName := fmt.Sprintf("%s-%s", cfg.Backend.Subscription, hostname)
topic := pulsarconnector.BuildPersistentTopic(pulsarClient.GetConfig().Tenant, pulsarClient.GetConfig().Namespace, cfg.Backend.Topic)
topic := pulsarconnector.BuildPersistentTopic(pulsarClient.GetConfig().Tenant, pulsarClient.GetConfig().Namespace, cfg.Backend.ConsumerTopic)
logger.L().Debug("creating new pulsar reader",
helpers.String("readerName", readerName),
helpers.String("topic", topic))
Expand Down Expand Up @@ -333,7 +333,7 @@ type PulsarMessageProducer struct {
}

func NewPulsarMessageProducer(cfg config.Config, pulsarClient pulsarconnector.Client) (*PulsarMessageProducer, error) {
topic := cfg.Backend.Topic
topic := cfg.Backend.ProducerTopic
fullTopic := pulsarconnector.BuildPersistentTopic(pulsarClient.GetConfig().Tenant, pulsarClient.GetConfig().Namespace, topic)

options := pulsar.ProducerOptions{
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type Backend struct {
AuthenticationServer *AuthenticationServerConfig `mapstructure:"authenticationServer"`
Subscription string `mapstructure:"subscription"`
PulsarConfig *pulsarconfig.PulsarConfig `mapstructure:"pulsarConfig"`
Topic pulsarconnector.TopicName `mapstructure:"topic"`
ProducerTopic pulsarconnector.TopicName `mapstructure:"producerTopic"`
ConsumerTopic pulsarconnector.TopicName `mapstructure:"consumerTopic"`
Prometheus *PrometheusConfig `mapstructure:"prometheusConfig"`
ReconciliationTask *ReconciliationTaskConfig `mapstructure:"reconciliationTaskConfig"`
}
Expand Down
3 changes: 2 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func TestLoadConfig(t *testing.T) {
RedeliveryDelaySeconds: 5,
MaxDeliveryAttempts: 20,
},
Topic: "synchronizer",
ProducerTopic: "synchronizer",
ConsumerTopic: "synchronizer",
},
},
},
Expand Down
3 changes: 2 additions & 1 deletion configuration/server/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"backend": {
"subscription": "synchronizer-server",
"topic": "synchronizer",
"producerTopic": "synchronizer",
"consumerTopic": "synchronizer",
"pulsarConfig": {
"url": "pulsar://localhost:6650",
"tenant": "armo",
Expand Down

0 comments on commit 7035e5a

Please sign in to comment.