From 252498e296ab1e6b786504c09bf15463a9e26bdb Mon Sep 17 00:00:00 2001 From: Luke Tucker <64618+ltucker@users.noreply.github.com> Date: Mon, 23 Dec 2024 14:20:11 -0500 Subject: [PATCH] chore: ingestion processing cleanup (#209) Co-authored-by: Michal Fiedorowicz --- .../postgres/queries/ingestion_logs.sql | 5 + diode-server/dbstore/postgres/repository.go | 13 + diode-server/gen/dbstore/postgres/adapters.go | 40 +++ .../dbstore/postgres/ingestion_logs.sql.go | 29 +++ .../diode/v1/reconcilerpb/reconciler.pb.go | 246 ++++-------------- diode-server/reconciler/applier/applier.go | 7 +- .../reconciler/applier/applier_test.go | 2 +- diode-server/reconciler/differ/differ.go | 7 +- .../reconciler/generate_change_set.go | 56 ++++ .../reconciler/ingestion_processor.go | 48 +--- diode-server/reconciler/mocks/repository.go | 68 +++++ diode-server/reconciler/repository.go | 1 + 12 files changed, 272 insertions(+), 250 deletions(-) create mode 100644 diode-server/gen/dbstore/postgres/adapters.go create mode 100644 diode-server/reconciler/generate_change_set.go diff --git a/diode-server/dbstore/postgres/queries/ingestion_logs.sql b/diode-server/dbstore/postgres/queries/ingestion_logs.sql index 9f8334ac..47801b0d 100644 --- a/diode-server/dbstore/postgres/queries/ingestion_logs.sql +++ b/diode-server/dbstore/postgres/queries/ingestion_logs.sql @@ -26,6 +26,11 @@ WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL) ORDER BY id DESC LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); +-- name: RetrieveIngestionLogByExternalID :one +SELECT * +FROM ingestion_logs +WHERE external_id = $1; + -- name: RetrieveIngestionLogsWithChangeSets :many SELECT v_ingestion_logs_with_change_set.* FROM v_ingestion_logs_with_change_set diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go index adb4ec07..10e2424a 100644 --- a/diode-server/dbstore/postgres/repository.go +++ b/diode-server/dbstore/postgres/repository.go @@ -56,6 +56,19 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon return &createdIngestionLog.ID, nil } +// RetrieveIngestionLogByExternalID looks up an ingestion log using its external identifier (uuid) +func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) { + ingestionLog, err := r.queries.RetrieveIngestionLogByExternalID(ctx, uuid) + if err != nil { + return nil, nil, err + } + log, err := ingestionLog.ToProto() + if err != nil { + return nil, nil, err + } + return &ingestionLog.ID, log, nil +} + // UpdateIngestionLogStateWithError updates an ingestion log with a new state and error. func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error { params := postgres.UpdateIngestionLogStateWithErrorParams{ diff --git a/diode-server/gen/dbstore/postgres/adapters.go b/diode-server/gen/dbstore/postgres/adapters.go new file mode 100644 index 00000000..647186db --- /dev/null +++ b/diode-server/gen/dbstore/postgres/adapters.go @@ -0,0 +1,40 @@ +package postgres + +import ( + "fmt" + + "google.golang.org/protobuf/encoding/protojson" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" +) + +// ToProto converts sqlc structure to analogous protobuf +func (log IngestionLog) ToProto() (*reconcilerpb.IngestionLog, error) { + entity := &diodepb.Entity{} + if err := protojson.Unmarshal(log.Entity, entity); err != nil { + return nil, fmt.Errorf("failed to unmarshal entity: %w", err) + } + var ingestionErr reconcilerpb.IngestionError + if log.Error != nil { + if err := protojson.Unmarshal(log.Error, &ingestionErr); err != nil { + return nil, fmt.Errorf("failed to unmarshal error: %w", err) + } + } + + pblog := &reconcilerpb.IngestionLog{ + Id: log.ExternalID, + DataType: log.DataType.String, + State: reconcilerpb.State(log.State.Int32), + RequestId: log.RequestID.String, + IngestionTs: log.IngestionTs.Int64, + ProducerAppName: log.ProducerAppName.String, + ProducerAppVersion: log.ProducerAppVersion.String, + SdkName: log.SdkName.String, + SdkVersion: log.SdkVersion.String, + Entity: entity, + Error: &ingestionErr, + } + + return pblog, nil +} diff --git a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go index da019b51..50ef4c8b 100644 --- a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go +++ b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go @@ -98,6 +98,35 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog return i, err } +const retrieveIngestionLogByExternalID = `-- name: RetrieveIngestionLogByExternalID :one +SELECT id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +FROM ingestion_logs +WHERE external_id = $1 +` + +func (q *Queries) RetrieveIngestionLogByExternalID(ctx context.Context, externalID string) (IngestionLog, error) { + row := q.db.QueryRow(ctx, retrieveIngestionLogByExternalID, externalID) + var i IngestionLog + err := row.Scan( + &i.ID, + &i.ExternalID, + &i.DataType, + &i.State, + &i.RequestID, + &i.IngestionTs, + &i.ProducerAppName, + &i.ProducerAppVersion, + &i.SdkName, + &i.SdkVersion, + &i.Entity, + &i.Error, + &i.SourceMetadata, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + const retrieveIngestionLogs = `-- name: RetrieveIngestionLogs :many SELECT id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at FROM ingestion_logs diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go index df6dc146..40817567 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2 +// protoc-gen-go v1.35.2 // protoc (unknown) // source: diode/v1/reconciler.proto @@ -89,11 +89,9 @@ type IngestionDataSource struct { func (x *IngestionDataSource) Reset() { *x = IngestionDataSource{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *IngestionDataSource) String() string { @@ -104,7 +102,7 @@ func (*IngestionDataSource) ProtoMessage() {} func (x *IngestionDataSource) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -146,11 +144,9 @@ type RetrieveIngestionDataSourcesRequest struct { func (x *RetrieveIngestionDataSourcesRequest) Reset() { *x = RetrieveIngestionDataSourcesRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *RetrieveIngestionDataSourcesRequest) String() string { @@ -161,7 +157,7 @@ func (*RetrieveIngestionDataSourcesRequest) ProtoMessage() {} func (x *RetrieveIngestionDataSourcesRequest) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -208,11 +204,9 @@ type RetrieveIngestionDataSourcesResponse struct { func (x *RetrieveIngestionDataSourcesResponse) Reset() { *x = RetrieveIngestionDataSourcesResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *RetrieveIngestionDataSourcesResponse) String() string { @@ -223,7 +217,7 @@ func (*RetrieveIngestionDataSourcesResponse) ProtoMessage() {} func (x *RetrieveIngestionDataSourcesResponse) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -258,11 +252,9 @@ type IngestionError struct { func (x *IngestionError) Reset() { *x = IngestionError{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *IngestionError) String() string { @@ -273,7 +265,7 @@ func (*IngestionError) ProtoMessage() {} func (x *IngestionError) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -324,11 +316,9 @@ type IngestionMetrics struct { func (x *IngestionMetrics) Reset() { *x = IngestionMetrics{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *IngestionMetrics) String() string { @@ -339,7 +329,7 @@ func (*IngestionMetrics) ProtoMessage() {} func (x *IngestionMetrics) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -401,11 +391,9 @@ type ChangeSet struct { func (x *ChangeSet) Reset() { *x = ChangeSet{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *ChangeSet) String() string { @@ -416,7 +404,7 @@ func (*ChangeSet) ProtoMessage() {} func (x *ChangeSet) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -467,11 +455,9 @@ type IngestionLog struct { func (x *IngestionLog) Reset() { *x = IngestionLog{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *IngestionLog) String() string { @@ -482,7 +468,7 @@ func (*IngestionLog) ProtoMessage() {} func (x *IngestionLog) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[6] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -599,11 +585,9 @@ type RetrieveIngestionLogsRequest struct { func (x *RetrieveIngestionLogsRequest) Reset() { *x = RetrieveIngestionLogsRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *RetrieveIngestionLogsRequest) String() string { @@ -614,7 +598,7 @@ func (*RetrieveIngestionLogsRequest) ProtoMessage() {} func (x *RetrieveIngestionLogsRequest) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[7] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -698,11 +682,9 @@ type RetrieveIngestionLogsResponse struct { func (x *RetrieveIngestionLogsResponse) Reset() { *x = RetrieveIngestionLogsResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *RetrieveIngestionLogsResponse) String() string { @@ -713,7 +695,7 @@ func (*RetrieveIngestionLogsResponse) ProtoMessage() {} func (x *RetrieveIngestionLogsResponse) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[8] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -761,11 +743,9 @@ type IngestionError_Details struct { func (x *IngestionError_Details) Reset() { *x = IngestionError_Details{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[9] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *IngestionError_Details) String() string { @@ -776,7 +756,7 @@ func (*IngestionError_Details) ProtoMessage() {} func (x *IngestionError_Details) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[9] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -823,11 +803,9 @@ type IngestionError_Details_Error struct { func (x *IngestionError_Details_Error) Reset() { *x = IngestionError_Details_Error{} - if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[10] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_diode_v1_reconciler_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *IngestionError_Details_Error) String() string { @@ -838,7 +816,7 @@ func (*IngestionError_Details_Error) ProtoMessage() {} func (x *IngestionError_Details_Error) ProtoReflect() protoreflect.Message { mi := &file_diode_v1_reconciler_proto_msgTypes[10] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -1085,140 +1063,6 @@ func file_diode_v1_reconciler_proto_init() { if File_diode_v1_reconciler_proto != nil { return } - if !protoimpl.UnsafeEnabled { - file_diode_v1_reconciler_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*IngestionDataSource); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*RetrieveIngestionDataSourcesRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*RetrieveIngestionDataSourcesResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*IngestionError); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*IngestionMetrics); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*ChangeSet); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*IngestionLog); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*RetrieveIngestionLogsRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*RetrieveIngestionLogsResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*IngestionError_Details); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_diode_v1_reconciler_proto_msgTypes[10].Exporter = func(v any, i int) any { - switch v := v.(*IngestionError_Details_Error); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } file_diode_v1_reconciler_proto_msgTypes[7].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ diff --git a/diode-server/reconciler/applier/applier.go b/diode-server/reconciler/applier/applier.go index e3aa71aa..439743ef 100644 --- a/diode-server/reconciler/applier/applier.go +++ b/diode-server/reconciler/applier/applier.go @@ -9,7 +9,7 @@ import ( ) // ApplyChangeSet applies a change set to NetBox -func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, branchID string, nbClient netboxdiodeplugin.NetBoxAPI) error { +func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, nbClient netboxdiodeplugin.NetBoxAPI) error { changes := make([]netboxdiodeplugin.Change, 0) for _, change := range cs.ChangeSet { changes = append(changes, netboxdiodeplugin.Change{ @@ -25,8 +25,9 @@ func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.Chang req := netboxdiodeplugin.ChangeSetRequest{ ChangeSetID: cs.ChangeSetID, ChangeSet: changes, - // TODO(mfiedorowicz): take branch from ChangeSet, remove parameter - BranchID: branchID, + } + if cs.BranchID != nil { + req.BranchID = *cs.BranchID } resp, err := nbClient.ApplyChangeSet(ctx, req) diff --git a/diode-server/reconciler/applier/applier_test.go b/diode-server/reconciler/applier/applier_test.go index caf9683f..d70c5893 100644 --- a/diode-server/reconciler/applier/applier_test.go +++ b/diode-server/reconciler/applier/applier_test.go @@ -61,7 +61,7 @@ func TestApplyChangeSet(t *testing.T) { mockNetBoxAPI.On("ApplyChangeSet", ctx, req).Return(resp, nil) - err := applier.ApplyChangeSet(ctx, logger, cs, "", mockNetBoxAPI) + err := applier.ApplyChangeSet(ctx, logger, cs, mockNetBoxAPI) assert.NoError(t, err) mockNetBoxAPI.AssertExpectations(t) } diff --git a/diode-server/reconciler/differ/differ.go b/diode-server/reconciler/differ/differ.go index e7f00ade..c35b5327 100644 --- a/diode-server/reconciler/differ/differ.go +++ b/diode-server/reconciler/differ/differ.go @@ -93,11 +93,14 @@ func Diff(ctx context.Context, entity IngestEntity, branchID string, netboxAPI n ObjectID: objectID, ObjectVersion: nil, Data: obj.Data(), - // TODO(mfiedorowicz): include branchID }) } - return &changeset.ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes}, nil + cs := &changeset.ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes} + if branchID != "" { + cs.BranchID = &branchID + } + return cs, nil } func retrieveObjectState(ctx context.Context, netboxAPI netboxdiodeplugin.NetBoxAPI, change netbox.ComparableData, branchID string) (netbox.ComparableData, error) { diff --git a/diode-server/reconciler/generate_change_set.go b/diode-server/reconciler/generate_change_set.go new file mode 100644 index 00000000..4252c620 --- /dev/null +++ b/diode-server/reconciler/generate_change_set.go @@ -0,0 +1,56 @@ +package reconciler + +import ( + "context" + "errors" + "log/slog" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/differ" + "github.com/netboxlabs/diode/diode-server/sentry" +) + +func generateChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, branchID string, nbClient netboxdiodeplugin.NetBoxAPI, repository Repository, logger *slog.Logger) (*int32, *changeset.ChangeSet, error) { + ingestEntity := differ.IngestEntity{ + RequestID: ingestionLog.GetRequestId(), + DataType: ingestionLog.GetDataType(), + Entity: ingestionLog.GetEntity(), + State: int(ingestionLog.GetState()), + } + + changeSet, err := differ.Diff(ctx, ingestEntity, branchID, nbClient) + if err != nil { + tags := map[string]string{ + "request_id": ingestEntity.RequestID, + } + contextMap := map[string]any{ + "request_id": ingestEntity.RequestID, + "data_type": ingestEntity.DataType, + } + sentry.CaptureError(err, tags, "Ingest Entity", contextMap) + ingestionErr := extractIngestionError(err) + + if err2 := repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err2 != nil { + err = errors.Join(err, err2) + } + return nil, nil, err + } + + changeSetID, err := repository.CreateChangeSet(ctx, *changeSet, ingestionLogID) + if err != nil { + return nil, nil, err + } + + if len(changeSet.ChangeSet) == 0 { + if err := repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil { + logger.Warn("failed to update ingestion log state (ignored)", "ingestionLogID", ingestionLog.GetId(), "error", err) + // TODO(ltucker): This should be in a transaction. Can leave an inconsistent state marked on the ingestion log. + // return nil, err + } + } + + logger.Debug("change set generated", "id", changeSetID, "externalID", changeSet.ChangeSetID, "ingestionLogID", ingestionLog.GetId()) + return changeSetID, changeSet, nil +} diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 09a8914b..3502ec8e 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -20,7 +20,6 @@ import ( "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler/applier" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" - "github.com/netboxlabs/diode/diode-server/reconciler/differ" "github.com/netboxlabs/diode/diode-server/sentry" ) @@ -261,57 +260,21 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan return } - ingestEntity := differ.IngestEntity{ - RequestID: msg.ingestionLog.GetId(), - DataType: msg.ingestionLog.GetDataType(), - Entity: msg.ingestionLog.GetEntity(), - State: int(msg.ingestionLog.GetState()), - } - - changeSet, err := differ.Diff(ctx, ingestEntity, "", p.nbClient) - if err != nil { - tags := map[string]string{ - "request_id": ingestEntity.RequestID, - } - contextMap := map[string]any{ - "request_id": ingestEntity.RequestID, - "data_type": ingestEntity.DataType, - } - sentry.CaptureError(err, tags, "Ingest Entity", contextMap) - p.logger.Debug("failed to prepare change set", "ingestionLogID", msg.ingestionLog.GetId(), "error", err) - - msg.errors = append(msg.errors, fmt.Errorf("failed to prepare change set: %v", err)) - - ingestionErr := extractIngestionError(err) - - if err = p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { - msg.errors = append(msg.errors, err) - } - break - } - - msg.changeSet = changeSet - - id, err := p.repository.CreateChangeSet(ctx, *changeSet, msg.ingestionLogID) + id, changeSet, err := generateChangeSet(ctx, msg.ingestionLogID, msg.ingestionLog, "", p.nbClient, p.repository, p.logger) if err != nil { - msg.errors = append(msg.errors, fmt.Errorf("failed to create change set: %v", err)) + p.logger.Error("error generating changeset", "error", err) } - if len(changeSet.ChangeSet) > 0 { + if changeSet != nil && len(changeSet.ChangeSet) > 0 { if applyChangeSetChan != nil { applyChangeSetChan <- IngestionLogToProcess{ ingestionLogID: msg.ingestionLogID, ingestionLog: msg.ingestionLog, changeSetID: *id, - changeSet: msg.changeSet, + changeSet: changeSet, } } - } else { - if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil { - msg.errors = append(msg.errors, err) - } } - p.logger.Debug("change set generated", "id", id, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID) } } }() @@ -342,10 +305,9 @@ func (p *IngestionProcessor) ApplyChangeSet(ctx context.Context, applyChan <-cha return } - if err := applier.ApplyChangeSet(ctx, p.logger, *msg.changeSet, "", p.nbClient); err != nil { + if err := applier.ApplyChangeSet(ctx, p.logger, *msg.changeSet, p.nbClient); err != nil { p.logger.Debug("failed to apply change set", "id", msg.changeSetID, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID, "error", err) msg.errors = append(msg.errors, fmt.Errorf("failed to apply change set: %v", err)) - ingestionErr := extractIngestionError(err) if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go index 176915e0..95ce784f 100644 --- a/diode-server/reconciler/mocks/repository.go +++ b/diode-server/reconciler/mocks/repository.go @@ -203,6 +203,74 @@ func (_c *Repository_CreateIngestionLog_Call) RunAndReturn(run func(context.Cont return _c } +// RetrieveIngestionLogByExternalID provides a mock function with given fields: ctx, uuid +func (_m *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) { + ret := _m.Called(ctx, uuid) + + if len(ret) == 0 { + panic("no return value specified for RetrieveIngestionLogByExternalID") + } + + var r0 *int32 + var r1 *reconcilerpb.IngestionLog + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*int32, *reconcilerpb.IngestionLog, error)); ok { + return rf(ctx, uuid) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *int32); ok { + r0 = rf(ctx, uuid) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*int32) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) *reconcilerpb.IngestionLog); ok { + r1 = rf(ctx, uuid) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*reconcilerpb.IngestionLog) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, uuid) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// Repository_RetrieveIngestionLogByExternalID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveIngestionLogByExternalID' +type Repository_RetrieveIngestionLogByExternalID_Call struct { + *mock.Call +} + +// RetrieveIngestionLogByExternalID is a helper method to define mock.On call +// - ctx context.Context +// - uuid string +func (_e *Repository_Expecter) RetrieveIngestionLogByExternalID(ctx interface{}, uuid interface{}) *Repository_RetrieveIngestionLogByExternalID_Call { + return &Repository_RetrieveIngestionLogByExternalID_Call{Call: _e.mock.On("RetrieveIngestionLogByExternalID", ctx, uuid)} +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) Run(run func(ctx context.Context, uuid string)) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) Return(_a0 *int32, _a1 *reconcilerpb.IngestionLog, _a2 error) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) RunAndReturn(run func(context.Context, string) (*int32, *reconcilerpb.IngestionLog, error)) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Return(run) + return _c +} + // RetrieveIngestionLogs provides a mock function with given fields: ctx, filter, limit, offset func (_m *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) { ret := _m.Called(ctx, filter, limit, offset) diff --git a/diode-server/reconciler/repository.go b/diode-server/reconciler/repository.go index db1fb611..4389db67 100644 --- a/diode-server/reconciler/repository.go +++ b/diode-server/reconciler/repository.go @@ -11,6 +11,7 @@ import ( type Repository interface { CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error + RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) CountIngestionLogsPerState(ctx context.Context) (map[reconcilerpb.State]int32, error) CreateChangeSet(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32) (*int32, error)