Skip to content

Commit

Permalink
Add topic prefix configuration option
Browse files Browse the repository at this point in the history
  • Loading branch information
toddkazakov committed Jun 22, 2020
1 parent 59c0871 commit 75a3e18
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions mailer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import (
)

type KafkaMailerConfig struct {
KafkaBrokers []string `envconfig:"TIDEPOOL_KAFKA_BROKERS" required:"true"`
KafkaTopic string `envconfig:"TIDEPOOL_KAFKA_EMAILS_TOPIC" required:"true"`
KafkaBrokers []string `envconfig:"TIDEPOOL_KAFKA_BROKERS" required:"true"`
KafkaTopicPrefix string `envconfig:"TIDEPOOL_KAFKA_TOPIC_PREFIX" required:"true"`
KafkaTopic string `envconfig:"TIDEPOOL_KAFKA_EMAILS_TOPIC" required:"true"`
}

func (k *KafkaMailerConfig) GetPrefixedTopic() string {
return fmt.Sprintf("%s%s", k.KafkaTopicPrefix, k.KafkaTopic)
}

type KafkaMailer struct {
Expand Down Expand Up @@ -40,8 +45,9 @@ func (k *KafkaMailer) Send(ctx context.Context, email *Email) error {
return err
}

topic := k.cfg.GetPrefixedTopic()
err = k.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &k.cfg.KafkaTopic, Partition: kafka.PartitionAny},
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: b,
}, k.deliveryChan)
return err
Expand Down

0 comments on commit 75a3e18

Please sign in to comment.