diff --git a/data/service/service/standard.go b/data/service/service/standard.go index 70a87965a..132f8cd95 100644 --- a/data/service/service/standard.go +++ b/data/service/service/standard.go @@ -483,22 +483,13 @@ func (s *Standard) initializeAlertsEventsHandler() error { return err } - // In addition to the CloudEventsConfig, additional specific config values - // are needed. - config := &struct { - KafkaAlertsTopics []string `envconfig:"KAFKA_ALERTS_TOPICS" default:"alerts,deviceData.alerts"` - KafkaAlertsGroupID string `envconfig:"KAFKA_ALERTS_CONSUMER_GROUP" required:"true"` - }{} - if err := envconfig.Process("", config); err != nil { - return errors.Wrap(err, "Unable to process envconfig") - } - + topics := []string{"data.alerts", "data.deviceData.alerts"} // Some kafka topics use a `-` as a prefix. But MongoDB CDC topics are created with // `.`. This code is using CDC topics, so ensuring that a `.` is used for alerts events // lines everything up as expected. topicPrefix := strings.ReplaceAll(commonConfig.KafkaTopicPrefix, "-", ".") - prefixedTopics := make([]string, 0, len(config.KafkaAlertsTopics)) - for _, topic := range config.KafkaAlertsTopics { + prefixedTopics := make([]string, 0, len(topics)) + for _, topic := range topics { prefixedTopics = append(prefixedTopics, topicPrefix+topic) } @@ -517,7 +508,7 @@ func (s *Standard) initializeAlertsEventsHandler() error { runnerCfg := dataEvents.SaramaRunnerConfig{ Brokers: commonConfig.KafkaBrokers, - GroupID: config.KafkaAlertsGroupID, + GroupID: "alerts", Topics: prefixedTopics, Sarama: commonConfig.SaramaConfig, MessageConsumer: &dataEvents.AlertsEventsConsumer{