Skip to content

Commit

Permalink
fix: opencdc unwrap (#1355)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb authored Feb 1, 2024
1 parent 1094b86 commit f3fbcc7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
58 changes: 38 additions & 20 deletions pkg/processor/procbuiltin/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package procbuiltin

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -173,16 +174,23 @@ func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.D
if !ok {
return key, cerrors.Errorf("record payload after doesn't contain key")
}

switch k := ky.(type) {
case record.Data:
key = k
case map[string]interface{}:
convertedData := make(record.StructuredData, len(k))
for kk, v := range k {
convertedData[kk] = v
}
key = convertedData
case string:
key = record.RawData{Raw: []byte(k)}
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(k)))
n, err := base64.StdEncoding.Decode(decoded, []byte(k))
if err != nil {
return key, cerrors.Errorf("couldn't decode key: %w", err)
}
key = record.RawData{Raw: decoded[:n]}
default:
return key, cerrors.Errorf("expected a record.Data or a string, got %T", k)
}

return key, nil
}

Expand All @@ -200,25 +208,35 @@ func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (reco
case map[string]interface{}:
afterData, ok := p["after"]
if !ok {
return payload, cerrors.Errorf("record payload after doesn't contain payload.after")
}

data, ok := afterData.(map[string]interface{})
if !ok {
return payload, cerrors.Errorf("record payload after payload.after is not a map")
}

convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
return record.Change{}, cerrors.Errorf("record payload after doesn't contain payload after")
}
switch data := afterData.(type) {
case map[string]interface{}:
convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
}
payload = record.Change{
Before: nil,
After: convertedData,
}
case string:
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
n, err := base64.StdEncoding.Decode(decoded, []byte(data))
if err != nil {
return payload, cerrors.Errorf("couldn't decode payload after: %w", err)
}
convertedData := record.RawData{Raw: decoded[:n]}

payload = record.Change{
Before: nil,
After: convertedData,
payload = record.Change{
Before: nil,
After: convertedData,
}
default:
return record.Change{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
}
default:
return payload, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p)
return record.Change{}, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p)
}
return payload, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/processor/procbuiltin/unwrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ func TestUnwrap_Process(t *testing.T) {
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
"key": record.RawData{
Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh"),
"key": map[string]interface{}{
"id": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
},
"payload": record.Change{
Before: nil,
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestUnwrap_Process(t *testing.T) {
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")},
Key: record.StructuredData{"id": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh"},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
Expand Down Expand Up @@ -710,7 +710,7 @@ func TestUnwrap_Process(t *testing.T) {
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")},
Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
Expand Down

0 comments on commit f3fbcc7

Please sign in to comment.