diff --git a/kafka/config.go b/kafka/config.go deleted file mode 100644 index 109beb7..0000000 --- a/kafka/config.go +++ /dev/null @@ -1,14 +0,0 @@ -package kafka - -import "time" - -type Config struct { - KafkaBrokers []string `envconfig:"TIDEPOOL_KAFKA_BROKERS" validate:"required"` - KafkaTopic string `envconfig:"TIDEPOOL_KAFKA_EMAILS_TOPIC" validate:"required"` -} - -type ConsumerConfig struct { - Config - KafkaPollInterval time.Duration `envconfig:"TIDEPOOL_KAFKA_EMAILS_POOL_INTERVAL" default:"100ms" validate:"required"` - ConsumerGroup string `envconfig:"TIDEPOOL_KAFKA_CONSUMER_GROUP" default:"mailer" validate:"required"` -} diff --git a/kafka/producer.go b/kafka/producer.go deleted file mode 100644 index ced0af8..0000000 --- a/kafka/producer.go +++ /dev/null @@ -1,83 +0,0 @@ -package kafka - -import ( - "context" - "encoding/json" - "errors" - "fmt" - kfka "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/tidepool-org/mailer/mailer" - "go.uber.org/zap" -) - -type EmailProducer struct { - cfg *Config - logger *zap.SugaredLogger - producer *kfka.Producer -} - -var _ mailer.Mailer = &EmailProducer{} - -func NewEmailProducer(cfg *Config, logger *zap.SugaredLogger) (*EmailProducer, error) { - producer, err := kfka.NewProducer(&kfka.ConfigMap{"bootstrap.servers": cfg.KafkaBrokers}) - if err != nil { - return nil, err - } - - return &EmailProducer{ - cfg: cfg, - logger: logger, - producer: producer, - }, nil -} - -func (k *EmailProducer) Close(timeoutMs int) (err error) { - outstandingEvents := k.producer.Flush(timeoutMs) - if outstandingEvents != 0 { - err = errors.New(fmt.Sprintf("%v events were not delivered", outstandingEvents)) - } - k.producer.Close() - return -} - -func (k *EmailProducer) Send(ctx context.Context, email *mailer.Email) error { - b, err := json.Marshal(email) - if err != nil { - return err - } - - deliveryChan := make(chan kfka.Event) - err = k.producer.Produce(&kfka.Message{ - TopicPartition: kfka.TopicPartition{Topic: &k.cfg.KafkaTopic, Partition: kfka.PartitionAny}, - Value: b, - }, deliveryChan) - if err != nil { - k.logger.Errorw( - "Error enqueueing message", - "topic", k.cfg.KafkaTopic, - "error", err, - ) - return err - } - - e := <-deliveryChan - m := e.(*kfka.Message) - - if m.TopicPartition.Error != nil { - k.logger.Errorw( - "Message delivery failed", - "topic", m.TopicPartition.Topic, - "error", m.TopicPartition.Error, - ) - } else { - k.logger.Debugw( - "Successfully delivered message", - "topic", m.TopicPartition.Topic, - "partition", m.TopicPartition.Partition, - "offset", m.TopicPartition.Offset, - ) - } - - close(deliveryChan) - return m.TopicPartition.Error -} diff --git a/mailer/kafka.go b/mailer/kafka.go new file mode 100644 index 0000000..7c82aa2 --- /dev/null +++ b/mailer/kafka.go @@ -0,0 +1,59 @@ +package mailer + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +type Config struct { + KafkaBrokers []string `envconfig:"TIDEPOOL_KAFKA_BROKERS" validate:"required"` + KafkaFlushTimeout int `envconfig:"TIDEPOOL_KAFKA_FLUSH_TIMEOUT" default:"30s" validate:"required"` + KafkaTopic string `envconfig:"TIDEPOOL_KAFKA_EMAILS_TOPIC" validate:"required"` +} + +type KafkaMailer struct { + cfg *Config + deliveryChan chan kafka.Event + producer *kafka.Producer +} + +var _ Mailer = &KafkaMailer{} + +func NewKafkaMailer(cfg *Config, deliveryChan chan kafka.Event) (*KafkaMailer, error) { + producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": cfg.KafkaBrokers}) + if err != nil { + return nil, err + } + + return &KafkaMailer{ + cfg: cfg, + deliveryChan: deliveryChan, + producer: producer, + }, nil +} + +func (k *KafkaMailer) Send(ctx context.Context, email *Email) error { + b, err := json.Marshal(email) + if err != nil { + return err + } + + err = k.producer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &k.cfg.KafkaTopic, Partition: kafka.PartitionAny}, + Value: b, + }, k.deliveryChan) + return err +} + +func (k *KafkaMailer) Close(timeoutMs int) (err error) { + outstandingEvents := k.producer.Flush(timeoutMs) + if outstandingEvents != 0 { + err = errors.New(fmt.Sprintf("%v events were not delivered", outstandingEvents)) + } + k.producer.Close() + close(k.deliveryChan) + return +} diff --git a/mailer/template.go b/mailer/template.go new file mode 100644 index 0000000..d44c090 --- /dev/null +++ b/mailer/template.go @@ -0,0 +1,55 @@ +package mailer + +import ( + "bytes" + "fmt" + "github.com/pkg/errors" + "html/template" +) + +type EmailTemplate struct { + body *template.Template + name string + subject *template.Template +} + +func NewEmailTemplate(name string, subject string, body string) (*EmailTemplate, error) { + if name == "" { + return nil, errors.New("email template name cannot be empty") + } + if subject == "" { + return nil, errors.New("email template subject cannot be empty") + } + if body == "" { + return nil, errors.New("email template body cannot be empty") + } + precompiledSubject, err := template.New(fmt.Sprintf("%s_subject", name)).Parse(subject) + if err != nil { + return nil, err + } + precompiledBody, err := template.New(fmt.Sprintf("%s_body", name)).Parse(body) + if err != nil { + return nil, err + } + return &EmailTemplate{ + body: precompiledBody, + name: name, + subject: precompiledSubject, + }, nil +} + +func (e *EmailTemplate) RenderToEmail(params interface{}, email *Email) error { + var subject bytes.Buffer + var body bytes.Buffer + + if err := e.subject.Execute(&subject, params); err != nil { + return err + } + if err := e.body.Execute(&body, params); err != nil { + return err + } + + email.Subject = subject.String() + email.Body = body.String() + return nil +}