Skip to content

Commit

Permalink
fix: OpenCDC unwrap (#1343)
Browse files Browse the repository at this point in the history
* fix: opencdc unwrap

Co-authored-by: Nassor <[email protected]>

* address review comments

* add default cases

* refactor: add more tests

* doc: add comment on position

---------

Co-authored-by: Nassor <[email protected]>
Co-authored-by: Raúl Barroso <[email protected]>
  • Loading branch information
3 people authored Jan 30, 2024
1 parent 30d2b52 commit dccde3f
Show file tree
Hide file tree
Showing 2 changed files with 494 additions and 3 deletions.
178 changes: 177 additions & 1 deletion pkg/processor/procbuiltin/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (

FormatDebezium = "debezium"
FormatKafkaConnect = "kafka-connect"
FormatOpenCDC = "opencdc"
)

func init() {
Expand All @@ -60,6 +61,8 @@ func Unwrap(config processor.Config) (processor.Interface, error) {
proc.unwrapper = &debeziumUnwrapper{}
case FormatKafkaConnect:
proc.unwrapper = &kafkaConnectUnwrapper{}
case FormatOpenCDC:
proc.unwrapper = &openCDCUnwrapper{}
default:
return nil, cerrors.Errorf("%s: %q is not a valid format", unwrapProcType, format)
}
Expand Down Expand Up @@ -93,6 +96,180 @@ func (p *unwrapProcessor) Process(_ context.Context, in record.Record) (record.R
return out, nil
}

/*
Example of an OpenCDC record:
{
"key": "NWQ0N2UwZGQtNTkxYi00MGEyLTk3YzMtYzc1MDY0MWU3NTc1",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028881541916000",
"opencdc.version": "v1"
},
"operation": "create",
"payload": {
"after": {
"event_id": 2041181862,
"msg": "string 4c88f20f-aa77-4f4b-9354-e4fdb1989a52",
"pg_generator": false,
"sensor_id": 54434691,
"triggered": false
},
"before": null
},
"position": "ZWIwNmJiMmMtNWNhMS00YjUyLWE2ZmMtYzc0OTFlZDQ3OTYz"
}
*/

// openCDCUnwrapper unwraps an OpenCDC record from the payload, by unmarhsalling rec.Payload.After into type Record.
type openCDCUnwrapper struct{}

// UnwrapOperation extracts operation from a structuredData record.
func (o *openCDCUnwrapper) UnwrapOperation(structData record.StructuredData) (record.Operation, error) {
var operation record.Operation
op, ok := structData["operation"]
if !ok {
return operation, cerrors.Errorf("record payload after doesn't contain operation")
}

switch opType := op.(type) {
case record.Operation:
operation = opType
case string:
if err := operation.UnmarshalText([]byte(opType)); err != nil {
return operation, cerrors.Errorf("couldn't unmarshal record operation")
}
default:
return operation, cerrors.Errorf("expected a record.Operation or a string, got %T", opType)
}
return operation, nil
}

// UnwrapMetadata extracts metadata from a structuredData record.
func (o *openCDCUnwrapper) UnwrapMetadata(structData record.StructuredData) (record.Metadata, error) {
var metadata record.Metadata
meta, ok := structData["metadata"]
if !ok {
return metadata, cerrors.Errorf("record payload after doesn't contain metadata")
}

switch m := meta.(type) {
case record.Metadata:
metadata = m
case map[string]interface{}:
metadata = make(record.Metadata, len(m))
for k, v := range m {
metadata[k] = fmt.Sprint(v)
}
default:
return metadata, cerrors.Errorf("expected a record.Metadata or a map[string]interface{}, got %T", m)
}
return metadata, nil
}

// UnwrapKey extracts key from a structuredData record.
func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.Data, error) {
var key record.Data
ky, ok := structData["key"]
if !ok {
return key, cerrors.Errorf("record payload after doesn't contain key")
}

switch k := ky.(type) {
case record.Data:
key = k
case string:
key = record.RawData{Raw: []byte(k)}
default:
return key, cerrors.Errorf("expected a record.Data or a string, got %T", k)
}

return key, nil
}

// UnwrapPayload extracts payload from a structuredData record.
func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) {
var payload record.Change
pl, ok := structData["payload"]
if !ok {
return payload, cerrors.Errorf("record payload doesn't contain payload")
}

switch p := pl.(type) {
case record.Change:
payload = p
case map[string]interface{}:
afterData, ok := p["after"]
if !ok {
return payload, cerrors.Errorf("record payload after doesn't contain payload.after")
}

data, ok := afterData.(map[string]interface{})
if !ok {
return payload, cerrors.Errorf("record payload after payload.after is not a map")
}

convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
}

payload = record.Change{
Before: nil,
After: convertedData,
}
default:
return payload, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p)
}
return payload, nil
}

// Unwrap replaces the whole record.payload with record.payload.after.payload except position.
func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
var structData record.StructuredData
data := rec.Payload.After
switch d := data.(type) {
case record.RawData:
// unmarshal raw data to structured
if err := json.Unmarshal(data.Bytes(), &structData); err != nil {
return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err)
}
case record.StructuredData:
structData = d
default:
return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
}

operation, err := o.UnwrapOperation(structData)
if err != nil {
return record.Record{}, err
}

metadata, err := o.UnwrapMetadata(structData)
if err != nil {
return record.Record{}, err
}

key, err := o.UnwrapKey(structData)
if err != nil {
return record.Record{}, err
}

payload, err := o.UnwrapPayload(structData)
if err != nil {
return record.Record{}, err
}

// Position is the only key we preserve from the original record to maintain the reference respect other messages
// that will be coming from in the event of chaining pipelines (e.g.: source -> kafka, kafka -> destination)
return record.Record{
Key: key,
Position: rec.Position,
Metadata: metadata,
Payload: payload,
Operation: operation,
}, nil
}

/*
Example of a kafka-connect record:
{
Expand Down Expand Up @@ -186,7 +363,6 @@ Example of a debezium record:
"schema": {} // will be ignored
}
*/

// debeziumUnwrapper unwraps a debezium record from the payload.
type debeziumUnwrapper struct {
kafkaConnectUnwrapper kafkaConnectUnwrapper
Expand Down
Loading

0 comments on commit dccde3f

Please sign in to comment.