Skip to content

Commit

Permalink
feat: store generated changeset into ingestion log (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoparente authored Sep 20, 2024
1 parent 4be8d62 commit 44e1bcd
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 145 deletions.
7 changes: 7 additions & 0 deletions diode-proto/diode/v1/reconciler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ message IngestionMetrics {
int32 no_changes = 5;
}

// A change set
message ChangeSet {
string id = 1; // A change set ID
bytes data = 2; // Binary data representing the change set
}

// An ingestion log
message IngestionLog {
string id = 1;
Expand All @@ -83,6 +89,7 @@ message IngestionLog {
string sdk_version = 9;
diode.v1.Entity entity = 10;
IngestionError error = 11;
ChangeSet change_set = 12;
}

// The request to retrieve ingestion logs
Expand Down
346 changes: 215 additions & 131 deletions diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go

Large diffs are not rendered by default.

132 changes: 132 additions & 0 deletions diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions diode-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.1.0
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
Expand Down
2 changes: 2 additions & 0 deletions diode-server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
Expand Down
41 changes: 39 additions & 2 deletions diode-server/reconciler/ingestion_processor.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package reconciler

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"regexp"
"strconv"

"github.com/andybalholm/brotli"
"github.com/kelseyhightower/envconfig"
"github.com/redis/go-redis/v9"
"github.com/segmentio/ksuid"
Expand Down Expand Up @@ -232,6 +235,16 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.
ingestionLog.State = reconcilerpb.State_FAILED
ingestionLog.Error = extractIngestionError(err)

if changeSet != nil {
ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID}
csCompressed, err := compressChangeSet(changeSet)
if err != nil {
errs = append(errs, err)
} else {
ingestionLog.ChangeSet.Data = csCompressed
}
}

if _, err = p.writeIngestionLog(ctx, key, ingestionLog); err != nil {
errs = append(errs, err)
}
Expand All @@ -240,7 +253,13 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.

if changeSet != nil {
ingestionLog.State = reconcilerpb.State_RECONCILED
//TODO: add change set ID to ingestion log
ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID}
csCompressed, err := compressChangeSet(changeSet)
if err != nil {
errs = append(errs, err)
} else {
ingestionLog.ChangeSet.Data = csCompressed
}
} else {
ingestionLog.State = reconcilerpb.State_NO_CHANGES
}
Expand Down Expand Up @@ -328,7 +347,7 @@ func (p *IngestionProcessor) reconcileEntity(ctx context.Context, ingestEntity c

resp, err := p.nbClient.ApplyChangeSet(ctx, req)
if err != nil {
return nil, err
return cs, err
}

p.logger.Debug("apply change set response", "response", resp)
Expand Down Expand Up @@ -356,6 +375,24 @@ func normalizeIngestionLog(l []byte) []byte {
return re.ReplaceAll(l, []byte(`"ingestionTs":$1`))
}

func compressChangeSet(cs *changeset.ChangeSet) ([]byte, error) {
csJSON, err := json.Marshal(cs)
if err != nil {
return nil, fmt.Errorf("failed to marshal changeset JSON: %v", err)
}

var brotliBuf bytes.Buffer
brotliWriter := brotli.NewWriter(&brotliBuf)
if _, err = brotliWriter.Write(csJSON); err != nil {
return nil, fmt.Errorf("failed to compress changeset: %v", err)
}
if err = brotliWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to compress changeset: %v", err)
}

return brotliBuf.Bytes(), nil
}

func extractObjectType(in *diodepb.Entity) (string, error) {
switch in.GetEntity().(type) {
case *diodepb.Entity_Device:
Expand Down
Loading

0 comments on commit 44e1bcd

Please sign in to comment.