diff --git a/executors/kafka/README.md b/executors/kafka/README.md index d2c63692..32bf2a17 100644 --- a/executors/kafka/README.md +++ b/executors/kafka/README.md @@ -33,6 +33,7 @@ In your yaml file, you can use: # for producer client type: - messages - messages.topic - Topic where to post message + - messages.headers - Headers for message (optional) - messages.value - Value for message - messages.valueFile - Take value for message from file provided here - messages.avroSchemaFile - Specify Avro schema file. messages.valueFile or messages.value should have value, which can be encoded with that schema. If not provided, then it will retrieve the latest available version from schema registry using Topic Name strategy, that is, ${topicName}-value as subject. diff --git a/executors/kafka/kafka.go b/executors/kafka/kafka.go index 30f67c19..b3db2106 100644 --- a/executors/kafka/kafka.go +++ b/executors/kafka/kafka.go @@ -33,11 +33,12 @@ func New() venom.Executor { type ( // Message represents the object sended or received from kafka Message struct { - Topic string `json:"topic" yaml:"topic"` - Key string `json:"key" yaml:"key"` - Value string `json:"value,omitempty" yaml:"value,omitempty"` - ValueFile string `json:"valueFile,omitempty" yaml:"valueFile,omitempty"` - AvroSchemaFile string `json:"avroSchemaFile,omitempty" yaml:"avroSchemaFile,omitempty"` + Topic string `json:"topic" yaml:"topic"` + Key string `json:"key" yaml:"key"` + Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"` + Value string `json:"value,omitempty" yaml:"value,omitempty"` + ValueFile string `json:"valueFile,omitempty" yaml:"valueFile,omitempty"` + AvroSchemaFile string `json:"avroSchemaFile,omitempty" yaml:"avroSchemaFile,omitempty"` } // MessageJSON represents the object sended or received from kafka @@ -212,9 +213,10 @@ func (e Executor) produceMessages(workdir string) error { return err } messages = append(messages, &sarama.ProducerMessage{ - Topic: message.Topic, - Key: sarama.ByteEncoder([]byte(message.Key)), - Value: sarama.ByteEncoder(value), + Topic: message.Topic, + Key: sarama.ByteEncoder([]byte(message.Key)), + Headers: convertToRecordHeaders(message.Headers), + Value: sarama.ByteEncoder(value), }) } @@ -544,3 +546,19 @@ func convertFromMessage2JSON(message *Message, msgJSON *MessageJSON) { msgJSON.Key = listMessageJSON } } + +func convertToRecordHeaders(headers map[string]string) []sarama.RecordHeader { + results := make([]sarama.RecordHeader, len(headers)) + idx := 0 + + for k, v := range headers { + results[idx] = sarama.RecordHeader{ + Key: sarama.ByteEncoder(k), + Value: sarama.ByteEncoder(v), + } + + idx++ + } + + return results +} diff --git a/tests/kafka.yml b/tests/kafka.yml index 4cce5cdc..784a0177 100644 --- a/tests/kafka.yml +++ b/tests/kafka.yml @@ -14,6 +14,9 @@ testcases: - "{{.kafkaHost}}:{{.kafkaPort}}" messages: - topic: test-topic + headers: + 'x-api-key': 'hola' + 'x-something-else': 'ole' value: '{"hello":"bar"}' - type: kafka clientType: consumer