Skip to content

Commit

Permalink
feat: modify kafka extractor (#255)
Browse files Browse the repository at this point in the history
* feat(kafka): extract number of partitions and allow urn prefixing

* feat(kafka): skip default topics

* fix(enrich): kafka custom properties not populated
  • Loading branch information
StewartJingga authored Oct 16, 2021
1 parent c961884 commit 5cd957d
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 15 deletions.
1 change: 1 addition & 0 deletions plugins/extractors/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ source:
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `broker` | `string` | `localhost:9092` | Kafka broker's host | *required* |
| `urn_prefix` | `string` | `samplePrefix-` | Prefix to be prepended to urn field | *optional* |

## Outputs

Expand Down
52 changes: 42 additions & 10 deletions plugins/extractors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
_ "embed" // used to print the embedded assets

"github.com/pkg/errors"

"github.com/odpf/meteor/models"
Expand All @@ -19,9 +20,16 @@ import (
//go:embed README.md
var summary string

// default topics map to skip
var defaultTopics = map[string]byte{
"__consumer_offsets": 0,
"_schemas": 0,
}

// Config hold the set of configuration for the kafka extractor
type Config struct {
Broker string `mapstructure:"broker" validate:"required"`
Broker string `mapstructure:"broker" validate:"required"`
UrnPrefix string `mapstructure:"urn_prefix"`
}

var sampleConfig = `
Expand Down Expand Up @@ -85,30 +93,54 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
}

// collect topic list from partition list
topics := map[string]bool{}
topics := map[string]int{}
for _, p := range partitions {
topics[p.Topic] = true
_, ok := topics[p.Topic]
if !ok {
topics[p.Topic] = 0
}

topics[p.Topic]++
}

// process topics
for topicName := range topics {
emit(models.NewRecord(e.buildTopic(topicName)))
// build and push topics
for topic, numOfPartitions := range topics {
// skip if topic is a default topic
_, isDefaultTopic := defaultTopics[topic]
if isDefaultTopic {
continue
}

record := models.NewRecord(e.buildTopic(topic, numOfPartitions))
emit(record)
}

return
}

// Build topic metadata model using a topic name
func (e *Extractor) buildTopic(topicName string) *assets.Topic {
// Build topic metadata model using a topic and number of partitions
func (e *Extractor) buildTopic(topic string, numOfPartitions int) *assets.Topic {
return &assets.Topic{
Resource: &common.Resource{
Urn: topicName,
Name: topicName,
Urn: e.buildUrn(topic),
Name: topic,
Service: "kafka",
},
Profile: &assets.TopicProfile{
NumberOfPartitions: int64(numOfPartitions),
},
}
}

// Build urn using prefixes from config
func (e *Extractor) buildUrn(topic string) string {
if e.config.UrnPrefix != "" {
topic = e.config.UrnPrefix + topic
}

return topic
}

func init() {
if err := registry.Extractors.Register("kafka", func() plugins.Extractor {
return New(plugins.GetLog())
Expand Down
67 changes: 65 additions & 2 deletions plugins/extractors/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package kafka_test

import (
"context"
"github.com/odpf/meteor/test/utils"
"log"
"net"

"github.com/odpf/meteor/test/utils"

"os"
"strconv"
"testing"
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestInit(t *testing.T) {
}

func TestExtract(t *testing.T) {
t.Run("should return list of topic metadata", func(t *testing.T) {
t.Run("should emit list of topic metadata", func(t *testing.T) {
ctx := context.TODO()
extr := newExtractor()
err := extr.Init(ctx, map[string]interface{}{
Expand All @@ -115,20 +116,82 @@ func TestExtract(t *testing.T) {
Name: "meteor-test-topic-1",
Service: "kafka",
},
Profile: &assets.TopicProfile{
NumberOfPartitions: 1,
},
}),
models.NewRecord(&assets.Topic{
Resource: &common.Resource{
Urn: "meteor-test-topic-2",
Name: "meteor-test-topic-2",
Service: "kafka",
},
Profile: &assets.TopicProfile{
NumberOfPartitions: 1,
},
}),
models.NewRecord(&assets.Topic{
Resource: &common.Resource{
Urn: "meteor-test-topic-3",
Name: "meteor-test-topic-3",
Service: "kafka",
},
Profile: &assets.TopicProfile{
NumberOfPartitions: 1,
},
}),
}
// We need this function because the extractor cannot guarantee order
// so comparing expected slice and result slice will not be consistent
assertResults(t, expected, emitter.Get())
})

t.Run("should add prefix to urn if urn_prefix is defined", func(t *testing.T) {
ctx := context.TODO()
extr := newExtractor()
err := extr.Init(ctx, map[string]interface{}{
"broker": brokerHost,
"urn_prefix": "samplePrefix-",
})
if err != nil {
t.Fatal(err)
}

emitter := mocks.NewEmitter()
err = extr.Extract(ctx, emitter.Push)
assert.NoError(t, err)

// assert results with expected data
expected := []models.Record{
models.NewRecord(&assets.Topic{
Resource: &common.Resource{
Urn: "samplePrefix-meteor-test-topic-1",
Name: "meteor-test-topic-1",
Service: "kafka",
},
Profile: &assets.TopicProfile{
NumberOfPartitions: 1,
},
}),
models.NewRecord(&assets.Topic{
Resource: &common.Resource{
Urn: "samplePrefix-meteor-test-topic-2",
Name: "meteor-test-topic-2",
Service: "kafka",
},
Profile: &assets.TopicProfile{
NumberOfPartitions: 1,
},
}),
models.NewRecord(&assets.Topic{
Resource: &common.Resource{
Urn: "samplePrefix-meteor-test-topic-3",
Name: "meteor-test-topic-3",
Service: "kafka",
},
Profile: &assets.TopicProfile{
NumberOfPartitions: 1,
},
}),
}
// We need this function because the extractor cannot guarantee order
Expand Down
3 changes: 0 additions & 3 deletions plugins/processors/enrich/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ func (p *Processor) process(record models.Record) (models.Metadata, error) {
data := record.Data()
p.logger.Debug("enriching record", "record", data.GetResource().Urn)
customProps := utils.GetCustomProperties(data)
if customProps == nil {
return data, nil
}

// update custom properties using value from config
for key, value := range p.config {
Expand Down
6 changes: 6 additions & 0 deletions utils/custom_properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ func SetCustomProperties(metadata models.Metadata, customFields map[string]inter

switch metadata := metadata.(type) {
case *assets.Table:
metadata.Properties = properties
case *assets.Topic:
metadata.Properties = properties
case *assets.Dashboard:
metadata.Properties = properties
case *assets.Bucket:
metadata.Properties = properties
case *assets.Group:
metadata.Properties = properties
case *assets.Job:
metadata.Properties = properties
case *assets.User:
metadata.Properties = properties
}
Expand Down

0 comments on commit 5cd957d

Please sign in to comment.