Skip to content

Commit

Permalink
feat(executor/kafka): add kafka headers for producer (#710)
Browse files Browse the repository at this point in the history
* add kafka headers

Signed-off-by: Rheza Satria <[email protected]>
  • Loading branch information
rhzs authored Sep 7, 2023
1 parent a37fb0d commit 098de9a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
1 change: 1 addition & 0 deletions executors/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ In your yaml file, you can use:
# for producer client type:
- messages
- messages.topic - Topic where to post message
- messages.headers - Headers for message (optional)
- messages.value - Value for message
- messages.valueFile - Take value for message from file provided here
- messages.avroSchemaFile - Specify Avro schema file. messages.valueFile or messages.value should have value, which can be encoded with that schema. If not provided, then it will retrieve the latest available version from schema registry using Topic Name strategy, that is, ${topicName}-value as subject.
Expand Down
34 changes: 26 additions & 8 deletions executors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ func New() venom.Executor {
type (
// Message represents the object sended or received from kafka
Message struct {
Topic string `json:"topic" yaml:"topic"`
Key string `json:"key" yaml:"key"`
Value string `json:"value,omitempty" yaml:"value,omitempty"`
ValueFile string `json:"valueFile,omitempty" yaml:"valueFile,omitempty"`
AvroSchemaFile string `json:"avroSchemaFile,omitempty" yaml:"avroSchemaFile,omitempty"`
Topic string `json:"topic" yaml:"topic"`
Key string `json:"key" yaml:"key"`
Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
Value string `json:"value,omitempty" yaml:"value,omitempty"`
ValueFile string `json:"valueFile,omitempty" yaml:"valueFile,omitempty"`
AvroSchemaFile string `json:"avroSchemaFile,omitempty" yaml:"avroSchemaFile,omitempty"`
}

// MessageJSON represents the object sended or received from kafka
Expand Down Expand Up @@ -212,9 +213,10 @@ func (e Executor) produceMessages(workdir string) error {
return err
}
messages = append(messages, &sarama.ProducerMessage{
Topic: message.Topic,
Key: sarama.ByteEncoder([]byte(message.Key)),
Value: sarama.ByteEncoder(value),
Topic: message.Topic,
Key: sarama.ByteEncoder([]byte(message.Key)),
Headers: convertToRecordHeaders(message.Headers),
Value: sarama.ByteEncoder(value),
})
}

Expand Down Expand Up @@ -544,3 +546,19 @@ func convertFromMessage2JSON(message *Message, msgJSON *MessageJSON) {
msgJSON.Key = listMessageJSON
}
}

func convertToRecordHeaders(headers map[string]string) []sarama.RecordHeader {
results := make([]sarama.RecordHeader, len(headers))
idx := 0

for k, v := range headers {
results[idx] = sarama.RecordHeader{
Key: sarama.ByteEncoder(k),
Value: sarama.ByteEncoder(v),
}

idx++
}

return results
}
3 changes: 3 additions & 0 deletions tests/kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ testcases:
- "{{.kafkaHost}}:{{.kafkaPort}}"
messages:
- topic: test-topic
headers:
'x-api-key': 'hola'
'x-something-else': 'ole'
value: '{"hello":"bar"}'
- type: kafka
clientType: consumer
Expand Down

0 comments on commit 098de9a

Please sign in to comment.