From 7efa32176433e19f29e4a66342ec58685c11ef2c Mon Sep 17 00:00:00 2001 From: Luke Tucker Date: Mon, 23 Dec 2024 11:38:32 -0500 Subject: [PATCH] small refactoring of reconciler flows --- diode-proto/diode/v1/reconciler.proto | 27 -- .../postgres/queries/ingestion_logs.sql | 5 + diode-server/dbstore/postgres/repository.go | 42 ++ .../dbstore/postgres/ingestion_logs.sql.go | 29 ++ .../diode/v1/reconcilerpb/reconciler.pb.go | 403 +++--------------- .../v1/reconcilerpb/reconciler.pb.validate.go | 384 ----------------- .../v1/reconcilerpb/reconciler_grpc.pb.go | 39 -- diode-server/ingester/component_test.go | 4 +- diode-server/reconciler/applier/applier.go | 7 +- .../reconciler/applier/applier_test.go | 2 +- diode-server/reconciler/differ/differ.go | 7 +- .../reconciler/generate_change_set.go | 56 +++ .../reconciler/ingestion_processor.go | 48 +-- diode-server/reconciler/mocks/repository.go | 68 +++ diode-server/reconciler/repository.go | 1 + 15 files changed, 288 insertions(+), 834 deletions(-) create mode 100644 diode-server/reconciler/generate_change_set.go diff --git a/diode-proto/diode/v1/reconciler.proto b/diode-proto/diode/v1/reconciler.proto index e8cba9fa..b4ea156b 100644 --- a/diode-proto/diode/v1/reconciler.proto +++ b/diode-proto/diode/v1/reconciler.proto @@ -111,37 +111,10 @@ message RetrieveIngestionLogsResponse { string next_page_token = 3; // Token for the next page of results, if any } -enum ActionType { - ACTION_UNSPECIFIED = 0; - ACTION_DIFF = 1; - ACTION_APPLY = 2; - ACTION_REJECT = 3; -} - -// The request to take action on an ingestion log -message ActionIngestionLogRequest { - ActionType action = 1; - string ingestion_log_id = 2; - optional string branch_id = 3; -} - -// The response from the ActionIngestionLog request -message ActionIngestionLogResponse { - message Error { - string message = 1; - int32 code = 2; - } - - IngestionLog log = 1; - repeated Error errors = 2; -} - // Reconciler service API service ReconcilerService { // Retrieves ingestion data sources rpc RetrieveIngestionDataSources(RetrieveIngestionDataSourcesRequest) returns (RetrieveIngestionDataSourcesResponse) {} // Retrieves ingestion logs rpc RetrieveIngestionLogs(RetrieveIngestionLogsRequest) returns (RetrieveIngestionLogsResponse); - // Takes action on an ingestion log - rpc ActionIngestionLog(ActionIngestionLogRequest) returns (ActionIngestionLogResponse); } diff --git a/diode-server/dbstore/postgres/queries/ingestion_logs.sql b/diode-server/dbstore/postgres/queries/ingestion_logs.sql index 9f8334ac..47801b0d 100644 --- a/diode-server/dbstore/postgres/queries/ingestion_logs.sql +++ b/diode-server/dbstore/postgres/queries/ingestion_logs.sql @@ -26,6 +26,11 @@ WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL) ORDER BY id DESC LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); +-- name: RetrieveIngestionLogByExternalID :one +SELECT * +FROM ingestion_logs +WHERE external_id = $1; + -- name: RetrieveIngestionLogsWithChangeSets :many SELECT v_ingestion_logs_with_change_set.* FROM v_ingestion_logs_with_change_set diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go index adb4ec07..afaffdf4 100644 --- a/diode-server/dbstore/postgres/repository.go +++ b/diode-server/dbstore/postgres/repository.go @@ -56,6 +56,48 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon return &createdIngestionLog.ID, nil } +// RetrieveIngestionLogByExternalID looks up an ingestion log using its external identifier (uuid) +func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) { + ingestionLog, err := r.queries.RetrieveIngestionLogByExternalID(ctx, uuid) + if err != nil { + return nil, nil, err + } + log, err := pgToProtoIngestionLog(ingestionLog) + if err != nil { + return nil, nil, err + } + return &ingestionLog.ID, log, nil +} + +func pgToProtoIngestionLog(ingestionLog postgres.IngestionLog) (*reconcilerpb.IngestionLog, error) { + entity := &diodepb.Entity{} + if err := protojson.Unmarshal(ingestionLog.Entity, entity); err != nil { + return nil, fmt.Errorf("failed to unmarshal entity: %w", err) + } + var ingestionErr reconcilerpb.IngestionError + if ingestionLog.Error != nil { + if err := protojson.Unmarshal(ingestionLog.Error, &ingestionErr); err != nil { + return nil, fmt.Errorf("failed to unmarshal error: %w", err) + } + } + + log := &reconcilerpb.IngestionLog{ + Id: ingestionLog.ExternalID, + DataType: ingestionLog.DataType.String, + State: reconcilerpb.State(ingestionLog.State.Int32), + RequestId: ingestionLog.RequestID.String, + IngestionTs: ingestionLog.IngestionTs.Int64, + ProducerAppName: ingestionLog.ProducerAppName.String, + ProducerAppVersion: ingestionLog.ProducerAppVersion.String, + SdkName: ingestionLog.SdkName.String, + SdkVersion: ingestionLog.SdkVersion.String, + Entity: entity, + Error: &ingestionErr, + } + + return log, nil +} + // UpdateIngestionLogStateWithError updates an ingestion log with a new state and error. func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error { params := postgres.UpdateIngestionLogStateWithErrorParams{ diff --git a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go index da019b51..50ef4c8b 100644 --- a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go +++ b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go @@ -98,6 +98,35 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog return i, err } +const retrieveIngestionLogByExternalID = `-- name: RetrieveIngestionLogByExternalID :one +SELECT id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +FROM ingestion_logs +WHERE external_id = $1 +` + +func (q *Queries) RetrieveIngestionLogByExternalID(ctx context.Context, externalID string) (IngestionLog, error) { + row := q.db.QueryRow(ctx, retrieveIngestionLogByExternalID, externalID) + var i IngestionLog + err := row.Scan( + &i.ID, + &i.ExternalID, + &i.DataType, + &i.State, + &i.RequestID, + &i.IngestionTs, + &i.ProducerAppName, + &i.ProducerAppVersion, + &i.SdkName, + &i.SdkVersion, + &i.Entity, + &i.Error, + &i.SourceMetadata, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + const retrieveIngestionLogs = `-- name: RetrieveIngestionLogs :many SELECT id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at FROM ingestion_logs diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go index 053f4091..40817567 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go @@ -77,58 +77,6 @@ func (State) EnumDescriptor() ([]byte, []int) { return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{0} } -type ActionType int32 - -const ( - ActionType_ACTION_UNSPECIFIED ActionType = 0 - ActionType_ACTION_DIFF ActionType = 1 - ActionType_ACTION_APPLY ActionType = 2 - ActionType_ACTION_REJECT ActionType = 3 -) - -// Enum value maps for ActionType. -var ( - ActionType_name = map[int32]string{ - 0: "ACTION_UNSPECIFIED", - 1: "ACTION_DIFF", - 2: "ACTION_APPLY", - 3: "ACTION_REJECT", - } - ActionType_value = map[string]int32{ - "ACTION_UNSPECIFIED": 0, - "ACTION_DIFF": 1, - "ACTION_APPLY": 2, - "ACTION_REJECT": 3, - } -) - -func (x ActionType) Enum() *ActionType { - p := new(ActionType) - *p = x - return p -} - -func (x ActionType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (ActionType) Descriptor() protoreflect.EnumDescriptor { - return file_diode_v1_reconciler_proto_enumTypes[1].Descriptor() -} - -func (ActionType) Type() protoreflect.EnumType { - return &file_diode_v1_reconciler_proto_enumTypes[1] -} - -func (x ActionType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use ActionType.Descriptor instead. -func (ActionType) EnumDescriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{1} -} - // An ingestion data source type IngestionDataSource struct { state protoimpl.MessageState @@ -783,122 +731,6 @@ func (x *RetrieveIngestionLogsResponse) GetNextPageToken() string { return "" } -// The request to take action on an ingestion log -type ActionIngestionLogRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Action ActionType `protobuf:"varint,1,opt,name=action,proto3,enum=diode.v1.ActionType" json:"action,omitempty"` - IngestionLogId string `protobuf:"bytes,2,opt,name=ingestion_log_id,json=ingestionLogId,proto3" json:"ingestion_log_id,omitempty"` - BranchId *string `protobuf:"bytes,3,opt,name=branch_id,json=branchId,proto3,oneof" json:"branch_id,omitempty"` -} - -func (x *ActionIngestionLogRequest) Reset() { - *x = ActionIngestionLogRequest{} - mi := &file_diode_v1_reconciler_proto_msgTypes[9] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ActionIngestionLogRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ActionIngestionLogRequest) ProtoMessage() {} - -func (x *ActionIngestionLogRequest) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[9] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ActionIngestionLogRequest.ProtoReflect.Descriptor instead. -func (*ActionIngestionLogRequest) Descriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{9} -} - -func (x *ActionIngestionLogRequest) GetAction() ActionType { - if x != nil { - return x.Action - } - return ActionType_ACTION_UNSPECIFIED -} - -func (x *ActionIngestionLogRequest) GetIngestionLogId() string { - if x != nil { - return x.IngestionLogId - } - return "" -} - -func (x *ActionIngestionLogRequest) GetBranchId() string { - if x != nil && x.BranchId != nil { - return *x.BranchId - } - return "" -} - -// The response from the ActionIngestionLog request -type ActionIngestionLogResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Log *IngestionLog `protobuf:"bytes,1,opt,name=log,proto3" json:"log,omitempty"` - Errors []*ActionIngestionLogResponse_Error `protobuf:"bytes,2,rep,name=errors,proto3" json:"errors,omitempty"` -} - -func (x *ActionIngestionLogResponse) Reset() { - *x = ActionIngestionLogResponse{} - mi := &file_diode_v1_reconciler_proto_msgTypes[10] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ActionIngestionLogResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ActionIngestionLogResponse) ProtoMessage() {} - -func (x *ActionIngestionLogResponse) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[10] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ActionIngestionLogResponse.ProtoReflect.Descriptor instead. -func (*ActionIngestionLogResponse) Descriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{10} -} - -func (x *ActionIngestionLogResponse) GetLog() *IngestionLog { - if x != nil { - return x.Log - } - return nil -} - -func (x *ActionIngestionLogResponse) GetErrors() []*ActionIngestionLogResponse_Error { - if x != nil { - return x.Errors - } - return nil -} - type IngestionError_Details struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -911,7 +743,7 @@ type IngestionError_Details struct { func (x *IngestionError_Details) Reset() { *x = IngestionError_Details{} - mi := &file_diode_v1_reconciler_proto_msgTypes[11] + mi := &file_diode_v1_reconciler_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -923,7 +755,7 @@ func (x *IngestionError_Details) String() string { func (*IngestionError_Details) ProtoMessage() {} func (x *IngestionError_Details) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[11] + mi := &file_diode_v1_reconciler_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -971,7 +803,7 @@ type IngestionError_Details_Error struct { func (x *IngestionError_Details_Error) Reset() { *x = IngestionError_Details_Error{} - mi := &file_diode_v1_reconciler_proto_msgTypes[12] + mi := &file_diode_v1_reconciler_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -983,7 +815,7 @@ func (x *IngestionError_Details_Error) String() string { func (*IngestionError_Details_Error) ProtoMessage() {} func (x *IngestionError_Details_Error) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[12] + mi := &file_diode_v1_reconciler_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1013,59 +845,6 @@ func (x *IngestionError_Details_Error) GetChangeId() string { return "" } -type ActionIngestionLogResponse_Error struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` - Code int32 `protobuf:"varint,2,opt,name=code,proto3" json:"code,omitempty"` -} - -func (x *ActionIngestionLogResponse_Error) Reset() { - *x = ActionIngestionLogResponse_Error{} - mi := &file_diode_v1_reconciler_proto_msgTypes[13] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ActionIngestionLogResponse_Error) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ActionIngestionLogResponse_Error) ProtoMessage() {} - -func (x *ActionIngestionLogResponse_Error) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[13] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ActionIngestionLogResponse_Error.ProtoReflect.Descriptor instead. -func (*ActionIngestionLogResponse_Error) Descriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{10, 0} -} - -func (x *ActionIngestionLogResponse_Error) GetMessage() string { - if x != nil { - return x.Message - } - return "" -} - -func (x *ActionIngestionLogResponse_Error) GetCode() int32 { - if x != nil { - return x.Code - } - return 0 -} - var File_diode_v1_reconciler_proto protoreflect.FileDescriptor var file_diode_v1_reconciler_proto_rawDesc = []byte{ @@ -1193,73 +972,39 @@ var file_diode_v1_reconciler_proto_rawDesc = []byte{ 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x26, 0x0a, 0x0f, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6e, 0x65, 0x78, 0x74, 0x50, 0x61, 0x67, 0x65, 0x54, 0x6f, 0x6b, 0x65, - 0x6e, 0x22, 0xa3, 0x01, 0x0a, 0x19, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x2c, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, - 0x14, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, - 0x10, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x69, - 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, - 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x09, 0x62, 0x72, 0x61, 0x6e, 0x63, - 0x68, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x62, 0x72, - 0x61, 0x6e, 0x63, 0x68, 0x49, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x62, 0x72, - 0x61, 0x6e, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x22, 0xc1, 0x01, 0x0a, 0x1a, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x03, 0x6c, 0x6f, 0x67, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, - 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x03, 0x6c, 0x6f, 0x67, - 0x12, 0x42, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x2a, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x06, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x73, 0x1a, 0x35, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x2a, 0x50, 0x0a, 0x05, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, - 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x51, 0x55, 0x45, 0x55, 0x45, 0x44, 0x10, - 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x43, 0x4f, 0x4e, 0x43, 0x49, 0x4c, 0x45, 0x44, 0x10, - 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0e, 0x0a, - 0x0a, 0x4e, 0x4f, 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x53, 0x10, 0x04, 0x2a, 0x5a, 0x0a, - 0x0a, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x12, 0x41, - 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, - 0x44, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x44, 0x49, - 0x46, 0x46, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, - 0x50, 0x50, 0x4c, 0x59, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, - 0x5f, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x32, 0xdf, 0x02, 0x0a, 0x11, 0x52, 0x65, - 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, - 0x7f, 0x0a, 0x1c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, - 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, - 0x2d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, - 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, - 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, - 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, - 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x68, 0x0a, 0x15, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x26, 0x2e, 0x64, 0x69, 0x6f, 0x64, + 0x6e, 0x2a, 0x50, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x51, + 0x55, 0x45, 0x55, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x43, 0x4f, 0x4e, + 0x43, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, + 0x44, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x4f, 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, + 0x53, 0x10, 0x04, 0x32, 0xfe, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, + 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7f, 0x0a, 0x1c, 0x52, 0x65, 0x74, + 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, + 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, - 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x27, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, - 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, - 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5f, 0x0a, 0x12, 0x41, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, - 0x12, 0x23, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, - 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0xa4, 0x01, 0x0a, 0x0c, - 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0f, 0x52, 0x65, - 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, - 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x62, - 0x6f, 0x78, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x64, 0x69, 0x6f, - 0x64, 0x65, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x64, 0x69, - 0x6f, 0x64, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, - 0x72, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x44, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, - 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0xe2, - 0x02, 0x14, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x3a, 0x3a, - 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, + 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x68, 0x0a, 0x15, 0x52, 0x65, + 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, + 0x6f, 0x67, 0x73, 0x12, 0x26, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, + 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, + 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x64, 0x69, + 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x42, 0xa4, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x69, 0x6f, + 0x64, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0f, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, + 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x62, 0x6f, 0x78, 0x6c, 0x61, 0x62, 0x73, 0x2f, + 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2d, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x76, 0x31, 0x2f, + 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x44, + 0x58, 0x58, 0xaa, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, + 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x44, 0x69, 0x6f, 0x64, 0x65, + 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, + 0x02, 0x09, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -1274,52 +1019,43 @@ func file_diode_v1_reconciler_proto_rawDescGZIP() []byte { return file_diode_v1_reconciler_proto_rawDescData } -var file_diode_v1_reconciler_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_diode_v1_reconciler_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_diode_v1_reconciler_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_diode_v1_reconciler_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_diode_v1_reconciler_proto_goTypes = []any{ (State)(0), // 0: diode.v1.State - (ActionType)(0), // 1: diode.v1.ActionType - (*IngestionDataSource)(nil), // 2: diode.v1.IngestionDataSource - (*RetrieveIngestionDataSourcesRequest)(nil), // 3: diode.v1.RetrieveIngestionDataSourcesRequest - (*RetrieveIngestionDataSourcesResponse)(nil), // 4: diode.v1.RetrieveIngestionDataSourcesResponse - (*IngestionError)(nil), // 5: diode.v1.IngestionError - (*IngestionMetrics)(nil), // 6: diode.v1.IngestionMetrics - (*ChangeSet)(nil), // 7: diode.v1.ChangeSet - (*IngestionLog)(nil), // 8: diode.v1.IngestionLog - (*RetrieveIngestionLogsRequest)(nil), // 9: diode.v1.RetrieveIngestionLogsRequest - (*RetrieveIngestionLogsResponse)(nil), // 10: diode.v1.RetrieveIngestionLogsResponse - (*ActionIngestionLogRequest)(nil), // 11: diode.v1.ActionIngestionLogRequest - (*ActionIngestionLogResponse)(nil), // 12: diode.v1.ActionIngestionLogResponse - (*IngestionError_Details)(nil), // 13: diode.v1.IngestionError.Details - (*IngestionError_Details_Error)(nil), // 14: diode.v1.IngestionError.Details.Error - (*ActionIngestionLogResponse_Error)(nil), // 15: diode.v1.ActionIngestionLogResponse.Error - (*diodepb.Entity)(nil), // 16: diode.v1.Entity + (*IngestionDataSource)(nil), // 1: diode.v1.IngestionDataSource + (*RetrieveIngestionDataSourcesRequest)(nil), // 2: diode.v1.RetrieveIngestionDataSourcesRequest + (*RetrieveIngestionDataSourcesResponse)(nil), // 3: diode.v1.RetrieveIngestionDataSourcesResponse + (*IngestionError)(nil), // 4: diode.v1.IngestionError + (*IngestionMetrics)(nil), // 5: diode.v1.IngestionMetrics + (*ChangeSet)(nil), // 6: diode.v1.ChangeSet + (*IngestionLog)(nil), // 7: diode.v1.IngestionLog + (*RetrieveIngestionLogsRequest)(nil), // 8: diode.v1.RetrieveIngestionLogsRequest + (*RetrieveIngestionLogsResponse)(nil), // 9: diode.v1.RetrieveIngestionLogsResponse + (*IngestionError_Details)(nil), // 10: diode.v1.IngestionError.Details + (*IngestionError_Details_Error)(nil), // 11: diode.v1.IngestionError.Details.Error + (*diodepb.Entity)(nil), // 12: diode.v1.Entity } var file_diode_v1_reconciler_proto_depIdxs = []int32{ - 2, // 0: diode.v1.RetrieveIngestionDataSourcesResponse.ingestion_data_sources:type_name -> diode.v1.IngestionDataSource - 13, // 1: diode.v1.IngestionError.details:type_name -> diode.v1.IngestionError.Details + 1, // 0: diode.v1.RetrieveIngestionDataSourcesResponse.ingestion_data_sources:type_name -> diode.v1.IngestionDataSource + 10, // 1: diode.v1.IngestionError.details:type_name -> diode.v1.IngestionError.Details 0, // 2: diode.v1.IngestionLog.state:type_name -> diode.v1.State - 16, // 3: diode.v1.IngestionLog.entity:type_name -> diode.v1.Entity - 5, // 4: diode.v1.IngestionLog.error:type_name -> diode.v1.IngestionError - 7, // 5: diode.v1.IngestionLog.change_set:type_name -> diode.v1.ChangeSet + 12, // 3: diode.v1.IngestionLog.entity:type_name -> diode.v1.Entity + 4, // 4: diode.v1.IngestionLog.error:type_name -> diode.v1.IngestionError + 6, // 5: diode.v1.IngestionLog.change_set:type_name -> diode.v1.ChangeSet 0, // 6: diode.v1.RetrieveIngestionLogsRequest.state:type_name -> diode.v1.State - 8, // 7: diode.v1.RetrieveIngestionLogsResponse.logs:type_name -> diode.v1.IngestionLog - 6, // 8: diode.v1.RetrieveIngestionLogsResponse.metrics:type_name -> diode.v1.IngestionMetrics - 1, // 9: diode.v1.ActionIngestionLogRequest.action:type_name -> diode.v1.ActionType - 8, // 10: diode.v1.ActionIngestionLogResponse.log:type_name -> diode.v1.IngestionLog - 15, // 11: diode.v1.ActionIngestionLogResponse.errors:type_name -> diode.v1.ActionIngestionLogResponse.Error - 14, // 12: diode.v1.IngestionError.Details.errors:type_name -> diode.v1.IngestionError.Details.Error - 3, // 13: diode.v1.ReconcilerService.RetrieveIngestionDataSources:input_type -> diode.v1.RetrieveIngestionDataSourcesRequest - 9, // 14: diode.v1.ReconcilerService.RetrieveIngestionLogs:input_type -> diode.v1.RetrieveIngestionLogsRequest - 11, // 15: diode.v1.ReconcilerService.ActionIngestionLog:input_type -> diode.v1.ActionIngestionLogRequest - 4, // 16: diode.v1.ReconcilerService.RetrieveIngestionDataSources:output_type -> diode.v1.RetrieveIngestionDataSourcesResponse - 10, // 17: diode.v1.ReconcilerService.RetrieveIngestionLogs:output_type -> diode.v1.RetrieveIngestionLogsResponse - 12, // 18: diode.v1.ReconcilerService.ActionIngestionLog:output_type -> diode.v1.ActionIngestionLogResponse - 16, // [16:19] is the sub-list for method output_type - 13, // [13:16] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 7, // 7: diode.v1.RetrieveIngestionLogsResponse.logs:type_name -> diode.v1.IngestionLog + 5, // 8: diode.v1.RetrieveIngestionLogsResponse.metrics:type_name -> diode.v1.IngestionMetrics + 11, // 9: diode.v1.IngestionError.Details.errors:type_name -> diode.v1.IngestionError.Details.Error + 2, // 10: diode.v1.ReconcilerService.RetrieveIngestionDataSources:input_type -> diode.v1.RetrieveIngestionDataSourcesRequest + 8, // 11: diode.v1.ReconcilerService.RetrieveIngestionLogs:input_type -> diode.v1.RetrieveIngestionLogsRequest + 3, // 12: diode.v1.ReconcilerService.RetrieveIngestionDataSources:output_type -> diode.v1.RetrieveIngestionDataSourcesResponse + 9, // 13: diode.v1.ReconcilerService.RetrieveIngestionLogs:output_type -> diode.v1.RetrieveIngestionLogsResponse + 12, // [12:14] is the sub-list for method output_type + 10, // [10:12] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_diode_v1_reconciler_proto_init() } @@ -1328,14 +1064,13 @@ func file_diode_v1_reconciler_proto_init() { return } file_diode_v1_reconciler_proto_msgTypes[7].OneofWrappers = []any{} - file_diode_v1_reconciler_proto_msgTypes[9].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_diode_v1_reconciler_proto_rawDesc, - NumEnums: 2, - NumMessages: 14, + NumEnums: 1, + NumMessages: 11, NumExtensions: 0, NumServices: 1, }, diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go index 79187394..0545e34c 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go @@ -1280,281 +1280,6 @@ var _ interface { ErrorName() string } = RetrieveIngestionLogsResponseValidationError{} -// Validate checks the field values on ActionIngestionLogRequest with the rules -// defined in the proto definition for this message. If any rules are -// violated, the first error encountered is returned, or nil if there are no violations. -func (m *ActionIngestionLogRequest) Validate() error { - return m.validate(false) -} - -// ValidateAll checks the field values on ActionIngestionLogRequest with the -// rules defined in the proto definition for this message. If any rules are -// violated, the result is a list of violation errors wrapped in -// ActionIngestionLogRequestMultiError, or nil if none found. -func (m *ActionIngestionLogRequest) ValidateAll() error { - return m.validate(true) -} - -func (m *ActionIngestionLogRequest) validate(all bool) error { - if m == nil { - return nil - } - - var errors []error - - // no validation rules for Action - - // no validation rules for IngestionLogId - - if m.BranchId != nil { - // no validation rules for BranchId - } - - if len(errors) > 0 { - return ActionIngestionLogRequestMultiError(errors) - } - - return nil -} - -// ActionIngestionLogRequestMultiError is an error wrapping multiple validation -// errors returned by ActionIngestionLogRequest.ValidateAll() if the -// designated constraints aren't met. -type ActionIngestionLogRequestMultiError []error - -// Error returns a concatenation of all the error messages it wraps. -func (m ActionIngestionLogRequestMultiError) Error() string { - var msgs []string - for _, err := range m { - msgs = append(msgs, err.Error()) - } - return strings.Join(msgs, "; ") -} - -// AllErrors returns a list of validation violation errors. -func (m ActionIngestionLogRequestMultiError) AllErrors() []error { return m } - -// ActionIngestionLogRequestValidationError is the validation error returned by -// ActionIngestionLogRequest.Validate if the designated constraints aren't met. -type ActionIngestionLogRequestValidationError struct { - field string - reason string - cause error - key bool -} - -// Field function returns field value. -func (e ActionIngestionLogRequestValidationError) Field() string { return e.field } - -// Reason function returns reason value. -func (e ActionIngestionLogRequestValidationError) Reason() string { return e.reason } - -// Cause function returns cause value. -func (e ActionIngestionLogRequestValidationError) Cause() error { return e.cause } - -// Key function returns key value. -func (e ActionIngestionLogRequestValidationError) Key() bool { return e.key } - -// ErrorName returns error name. -func (e ActionIngestionLogRequestValidationError) ErrorName() string { - return "ActionIngestionLogRequestValidationError" -} - -// Error satisfies the builtin error interface -func (e ActionIngestionLogRequestValidationError) Error() string { - cause := "" - if e.cause != nil { - cause = fmt.Sprintf(" | caused by: %v", e.cause) - } - - key := "" - if e.key { - key = "key for " - } - - return fmt.Sprintf( - "invalid %sActionIngestionLogRequest.%s: %s%s", - key, - e.field, - e.reason, - cause) -} - -var _ error = ActionIngestionLogRequestValidationError{} - -var _ interface { - Field() string - Reason() string - Key() bool - Cause() error - ErrorName() string -} = ActionIngestionLogRequestValidationError{} - -// Validate checks the field values on ActionIngestionLogResponse with the -// rules defined in the proto definition for this message. If any rules are -// violated, the first error encountered is returned, or nil if there are no violations. -func (m *ActionIngestionLogResponse) Validate() error { - return m.validate(false) -} - -// ValidateAll checks the field values on ActionIngestionLogResponse with the -// rules defined in the proto definition for this message. If any rules are -// violated, the result is a list of violation errors wrapped in -// ActionIngestionLogResponseMultiError, or nil if none found. -func (m *ActionIngestionLogResponse) ValidateAll() error { - return m.validate(true) -} - -func (m *ActionIngestionLogResponse) validate(all bool) error { - if m == nil { - return nil - } - - var errors []error - - if all { - switch v := interface{}(m.GetLog()).(type) { - case interface{ ValidateAll() error }: - if err := v.ValidateAll(); err != nil { - errors = append(errors, ActionIngestionLogResponseValidationError{ - field: "Log", - reason: "embedded message failed validation", - cause: err, - }) - } - case interface{ Validate() error }: - if err := v.Validate(); err != nil { - errors = append(errors, ActionIngestionLogResponseValidationError{ - field: "Log", - reason: "embedded message failed validation", - cause: err, - }) - } - } - } else if v, ok := interface{}(m.GetLog()).(interface{ Validate() error }); ok { - if err := v.Validate(); err != nil { - return ActionIngestionLogResponseValidationError{ - field: "Log", - reason: "embedded message failed validation", - cause: err, - } - } - } - - for idx, item := range m.GetErrors() { - _, _ = idx, item - - if all { - switch v := interface{}(item).(type) { - case interface{ ValidateAll() error }: - if err := v.ValidateAll(); err != nil { - errors = append(errors, ActionIngestionLogResponseValidationError{ - field: fmt.Sprintf("Errors[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } - case interface{ Validate() error }: - if err := v.Validate(); err != nil { - errors = append(errors, ActionIngestionLogResponseValidationError{ - field: fmt.Sprintf("Errors[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } - } - } else if v, ok := interface{}(item).(interface{ Validate() error }); ok { - if err := v.Validate(); err != nil { - return ActionIngestionLogResponseValidationError{ - field: fmt.Sprintf("Errors[%v]", idx), - reason: "embedded message failed validation", - cause: err, - } - } - } - - } - - if len(errors) > 0 { - return ActionIngestionLogResponseMultiError(errors) - } - - return nil -} - -// ActionIngestionLogResponseMultiError is an error wrapping multiple -// validation errors returned by ActionIngestionLogResponse.ValidateAll() if -// the designated constraints aren't met. -type ActionIngestionLogResponseMultiError []error - -// Error returns a concatenation of all the error messages it wraps. -func (m ActionIngestionLogResponseMultiError) Error() string { - var msgs []string - for _, err := range m { - msgs = append(msgs, err.Error()) - } - return strings.Join(msgs, "; ") -} - -// AllErrors returns a list of validation violation errors. -func (m ActionIngestionLogResponseMultiError) AllErrors() []error { return m } - -// ActionIngestionLogResponseValidationError is the validation error returned -// by ActionIngestionLogResponse.Validate if the designated constraints aren't met. -type ActionIngestionLogResponseValidationError struct { - field string - reason string - cause error - key bool -} - -// Field function returns field value. -func (e ActionIngestionLogResponseValidationError) Field() string { return e.field } - -// Reason function returns reason value. -func (e ActionIngestionLogResponseValidationError) Reason() string { return e.reason } - -// Cause function returns cause value. -func (e ActionIngestionLogResponseValidationError) Cause() error { return e.cause } - -// Key function returns key value. -func (e ActionIngestionLogResponseValidationError) Key() bool { return e.key } - -// ErrorName returns error name. -func (e ActionIngestionLogResponseValidationError) ErrorName() string { - return "ActionIngestionLogResponseValidationError" -} - -// Error satisfies the builtin error interface -func (e ActionIngestionLogResponseValidationError) Error() string { - cause := "" - if e.cause != nil { - cause = fmt.Sprintf(" | caused by: %v", e.cause) - } - - key := "" - if e.key { - key = "key for " - } - - return fmt.Sprintf( - "invalid %sActionIngestionLogResponse.%s: %s%s", - key, - e.field, - e.reason, - cause) -} - -var _ error = ActionIngestionLogResponseValidationError{} - -var _ interface { - Field() string - Reason() string - Key() bool - Cause() error - ErrorName() string -} = ActionIngestionLogResponseValidationError{} - // Validate checks the field values on IngestionError_Details with the rules // defined in the proto definition for this message. If any rules are // violated, the first error encountered is returned, or nil if there are no violations. @@ -1801,112 +1526,3 @@ var _ interface { Cause() error ErrorName() string } = IngestionError_Details_ErrorValidationError{} - -// Validate checks the field values on ActionIngestionLogResponse_Error with -// the rules defined in the proto definition for this message. If any rules -// are violated, the first error encountered is returned, or nil if there are -// no violations. -func (m *ActionIngestionLogResponse_Error) Validate() error { - return m.validate(false) -} - -// ValidateAll checks the field values on ActionIngestionLogResponse_Error with -// the rules defined in the proto definition for this message. If any rules -// are violated, the result is a list of violation errors wrapped in -// ActionIngestionLogResponse_ErrorMultiError, or nil if none found. -func (m *ActionIngestionLogResponse_Error) ValidateAll() error { - return m.validate(true) -} - -func (m *ActionIngestionLogResponse_Error) validate(all bool) error { - if m == nil { - return nil - } - - var errors []error - - // no validation rules for Message - - // no validation rules for Code - - if len(errors) > 0 { - return ActionIngestionLogResponse_ErrorMultiError(errors) - } - - return nil -} - -// ActionIngestionLogResponse_ErrorMultiError is an error wrapping multiple -// validation errors returned by -// ActionIngestionLogResponse_Error.ValidateAll() if the designated -// constraints aren't met. -type ActionIngestionLogResponse_ErrorMultiError []error - -// Error returns a concatenation of all the error messages it wraps. -func (m ActionIngestionLogResponse_ErrorMultiError) Error() string { - var msgs []string - for _, err := range m { - msgs = append(msgs, err.Error()) - } - return strings.Join(msgs, "; ") -} - -// AllErrors returns a list of validation violation errors. -func (m ActionIngestionLogResponse_ErrorMultiError) AllErrors() []error { return m } - -// ActionIngestionLogResponse_ErrorValidationError is the validation error -// returned by ActionIngestionLogResponse_Error.Validate if the designated -// constraints aren't met. -type ActionIngestionLogResponse_ErrorValidationError struct { - field string - reason string - cause error - key bool -} - -// Field function returns field value. -func (e ActionIngestionLogResponse_ErrorValidationError) Field() string { return e.field } - -// Reason function returns reason value. -func (e ActionIngestionLogResponse_ErrorValidationError) Reason() string { return e.reason } - -// Cause function returns cause value. -func (e ActionIngestionLogResponse_ErrorValidationError) Cause() error { return e.cause } - -// Key function returns key value. -func (e ActionIngestionLogResponse_ErrorValidationError) Key() bool { return e.key } - -// ErrorName returns error name. -func (e ActionIngestionLogResponse_ErrorValidationError) ErrorName() string { - return "ActionIngestionLogResponse_ErrorValidationError" -} - -// Error satisfies the builtin error interface -func (e ActionIngestionLogResponse_ErrorValidationError) Error() string { - cause := "" - if e.cause != nil { - cause = fmt.Sprintf(" | caused by: %v", e.cause) - } - - key := "" - if e.key { - key = "key for " - } - - return fmt.Sprintf( - "invalid %sActionIngestionLogResponse_Error.%s: %s%s", - key, - e.field, - e.reason, - cause) -} - -var _ error = ActionIngestionLogResponse_ErrorValidationError{} - -var _ interface { - Field() string - Reason() string - Key() bool - Cause() error - ErrorName() string -} = ActionIngestionLogResponse_ErrorValidationError{} diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler_grpc.pb.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler_grpc.pb.go index 9f5f83c0..1721feae 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler_grpc.pb.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler_grpc.pb.go @@ -21,7 +21,6 @@ const _ = grpc.SupportPackageIsVersion7 const ( ReconcilerService_RetrieveIngestionDataSources_FullMethodName = "/diode.v1.ReconcilerService/RetrieveIngestionDataSources" ReconcilerService_RetrieveIngestionLogs_FullMethodName = "/diode.v1.ReconcilerService/RetrieveIngestionLogs" - ReconcilerService_ActionIngestionLog_FullMethodName = "/diode.v1.ReconcilerService/ActionIngestionLog" ) // ReconcilerServiceClient is the client API for ReconcilerService service. @@ -32,8 +31,6 @@ type ReconcilerServiceClient interface { RetrieveIngestionDataSources(ctx context.Context, in *RetrieveIngestionDataSourcesRequest, opts ...grpc.CallOption) (*RetrieveIngestionDataSourcesResponse, error) // Retrieves ingestion logs RetrieveIngestionLogs(ctx context.Context, in *RetrieveIngestionLogsRequest, opts ...grpc.CallOption) (*RetrieveIngestionLogsResponse, error) - // Takes action on an ingestion log - ActionIngestionLog(ctx context.Context, in *ActionIngestionLogRequest, opts ...grpc.CallOption) (*ActionIngestionLogResponse, error) } type reconcilerServiceClient struct { @@ -62,15 +59,6 @@ func (c *reconcilerServiceClient) RetrieveIngestionLogs(ctx context.Context, in return out, nil } -func (c *reconcilerServiceClient) ActionIngestionLog(ctx context.Context, in *ActionIngestionLogRequest, opts ...grpc.CallOption) (*ActionIngestionLogResponse, error) { - out := new(ActionIngestionLogResponse) - err := c.cc.Invoke(ctx, ReconcilerService_ActionIngestionLog_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // ReconcilerServiceServer is the server API for ReconcilerService service. // All implementations must embed UnimplementedReconcilerServiceServer // for forward compatibility @@ -79,8 +67,6 @@ type ReconcilerServiceServer interface { RetrieveIngestionDataSources(context.Context, *RetrieveIngestionDataSourcesRequest) (*RetrieveIngestionDataSourcesResponse, error) // Retrieves ingestion logs RetrieveIngestionLogs(context.Context, *RetrieveIngestionLogsRequest) (*RetrieveIngestionLogsResponse, error) - // Takes action on an ingestion log - ActionIngestionLog(context.Context, *ActionIngestionLogRequest) (*ActionIngestionLogResponse, error) mustEmbedUnimplementedReconcilerServiceServer() } @@ -94,9 +80,6 @@ func (UnimplementedReconcilerServiceServer) RetrieveIngestionDataSources(context func (UnimplementedReconcilerServiceServer) RetrieveIngestionLogs(context.Context, *RetrieveIngestionLogsRequest) (*RetrieveIngestionLogsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method RetrieveIngestionLogs not implemented") } -func (UnimplementedReconcilerServiceServer) ActionIngestionLog(context.Context, *ActionIngestionLogRequest) (*ActionIngestionLogResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ActionIngestionLog not implemented") -} func (UnimplementedReconcilerServiceServer) mustEmbedUnimplementedReconcilerServiceServer() {} // UnsafeReconcilerServiceServer may be embedded to opt out of forward compatibility for this service. @@ -146,24 +129,6 @@ func _ReconcilerService_RetrieveIngestionLogs_Handler(srv interface{}, ctx conte return interceptor(ctx, in, info, handler) } -func _ReconcilerService_ActionIngestionLog_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ActionIngestionLogRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ReconcilerServiceServer).ActionIngestionLog(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: ReconcilerService_ActionIngestionLog_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ReconcilerServiceServer).ActionIngestionLog(ctx, req.(*ActionIngestionLogRequest)) - } - return interceptor(ctx, in, info, handler) -} - // ReconcilerService_ServiceDesc is the grpc.ServiceDesc for ReconcilerService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -179,10 +144,6 @@ var ReconcilerService_ServiceDesc = grpc.ServiceDesc{ MethodName: "RetrieveIngestionLogs", Handler: _ReconcilerService_RetrieveIngestionLogs_Handler, }, - { - MethodName: "ActionIngestionLog", - Handler: _ReconcilerService_ActionIngestionLog_Handler, - }, }, Streams: []grpc.StreamDesc{}, Metadata: "diode/v1/reconciler.proto", diff --git a/diode-server/ingester/component_test.go b/diode-server/ingester/component_test.go index edc8b805..f78858c8 100644 --- a/diode-server/ingester/component_test.go +++ b/diode-server/ingester/component_test.go @@ -17,6 +17,7 @@ import ( pb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" "github.com/netboxlabs/diode/diode-server/ingester" + pluginmocks "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" "github.com/netboxlabs/diode/diode-server/reconciler" "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) @@ -72,7 +73,8 @@ const bufSize = 1024 * 1024 func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) mockRepository := mocks.NewRepository(t) - server, err := reconciler.NewServer(ctx, logger, mockRepository) + nbClientMock := pluginmocks.NewNetBoxAPI(t) + server, err := reconciler.NewServer(ctx, logger, nbClientMock, mockRepository) require.NoError(t, err) errChan := make(chan error, 1) diff --git a/diode-server/reconciler/applier/applier.go b/diode-server/reconciler/applier/applier.go index e3aa71aa..439743ef 100644 --- a/diode-server/reconciler/applier/applier.go +++ b/diode-server/reconciler/applier/applier.go @@ -9,7 +9,7 @@ import ( ) // ApplyChangeSet applies a change set to NetBox -func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, branchID string, nbClient netboxdiodeplugin.NetBoxAPI) error { +func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, nbClient netboxdiodeplugin.NetBoxAPI) error { changes := make([]netboxdiodeplugin.Change, 0) for _, change := range cs.ChangeSet { changes = append(changes, netboxdiodeplugin.Change{ @@ -25,8 +25,9 @@ func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.Chang req := netboxdiodeplugin.ChangeSetRequest{ ChangeSetID: cs.ChangeSetID, ChangeSet: changes, - // TODO(mfiedorowicz): take branch from ChangeSet, remove parameter - BranchID: branchID, + } + if cs.BranchID != nil { + req.BranchID = *cs.BranchID } resp, err := nbClient.ApplyChangeSet(ctx, req) diff --git a/diode-server/reconciler/applier/applier_test.go b/diode-server/reconciler/applier/applier_test.go index caf9683f..d70c5893 100644 --- a/diode-server/reconciler/applier/applier_test.go +++ b/diode-server/reconciler/applier/applier_test.go @@ -61,7 +61,7 @@ func TestApplyChangeSet(t *testing.T) { mockNetBoxAPI.On("ApplyChangeSet", ctx, req).Return(resp, nil) - err := applier.ApplyChangeSet(ctx, logger, cs, "", mockNetBoxAPI) + err := applier.ApplyChangeSet(ctx, logger, cs, mockNetBoxAPI) assert.NoError(t, err) mockNetBoxAPI.AssertExpectations(t) } diff --git a/diode-server/reconciler/differ/differ.go b/diode-server/reconciler/differ/differ.go index e7f00ade..c35b5327 100644 --- a/diode-server/reconciler/differ/differ.go +++ b/diode-server/reconciler/differ/differ.go @@ -93,11 +93,14 @@ func Diff(ctx context.Context, entity IngestEntity, branchID string, netboxAPI n ObjectID: objectID, ObjectVersion: nil, Data: obj.Data(), - // TODO(mfiedorowicz): include branchID }) } - return &changeset.ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes}, nil + cs := &changeset.ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes} + if branchID != "" { + cs.BranchID = &branchID + } + return cs, nil } func retrieveObjectState(ctx context.Context, netboxAPI netboxdiodeplugin.NetBoxAPI, change netbox.ComparableData, branchID string) (netbox.ComparableData, error) { diff --git a/diode-server/reconciler/generate_change_set.go b/diode-server/reconciler/generate_change_set.go new file mode 100644 index 00000000..0ca6b5db --- /dev/null +++ b/diode-server/reconciler/generate_change_set.go @@ -0,0 +1,56 @@ +package reconciler + +import ( + "context" + "errors" + "log/slog" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/differ" + "github.com/netboxlabs/diode/diode-server/sentry" +) + +func generateChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, branchID string, nbClient netboxdiodeplugin.NetBoxAPI, repository Repository, logger *slog.Logger) (*int32, *changeset.ChangeSet, error) { + ingestEntity := differ.IngestEntity{ + RequestID: ingestionLog.GetId(), // ???(ltucker): GetRequestId() ? + DataType: ingestionLog.GetDataType(), + Entity: ingestionLog.GetEntity(), + State: int(ingestionLog.GetState()), + } + + changeSet, err := differ.Diff(ctx, ingestEntity, branchID, nbClient) + if err != nil { + tags := map[string]string{ + "request_id": ingestEntity.RequestID, + } + contextMap := map[string]any{ + "request_id": ingestEntity.RequestID, + "data_type": ingestEntity.DataType, + } + sentry.CaptureError(err, tags, "Ingest Entity", contextMap) + ingestionErr := extractIngestionError(err) + + if err2 := repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err2 != nil { + err = errors.Join(err, err2) + } + return nil, nil, err + } + + changeSetID, err := repository.CreateChangeSet(ctx, *changeSet, ingestionLogID) + if err != nil { + return nil, nil, err + } + + if len(changeSet.ChangeSet) == 0 { + if err := repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil { + logger.Warn("failed to update ingestion log state (ignored)", "ingestionLogID", ingestionLog.GetId(), "error", err) + // TODO(ltucker): This should be in a transaction. Can leave an inconsistent state marked on the ingestion log. + // return nil, err + } + } + + logger.Debug("change set generated", "id", changeSetID, "externalID", changeSet.ChangeSetID, "ingestionLogID", ingestionLog.GetId()) + return changeSetID, changeSet, nil +} diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 09a8914b..3502ec8e 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -20,7 +20,6 @@ import ( "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler/applier" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" - "github.com/netboxlabs/diode/diode-server/reconciler/differ" "github.com/netboxlabs/diode/diode-server/sentry" ) @@ -261,57 +260,21 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan return } - ingestEntity := differ.IngestEntity{ - RequestID: msg.ingestionLog.GetId(), - DataType: msg.ingestionLog.GetDataType(), - Entity: msg.ingestionLog.GetEntity(), - State: int(msg.ingestionLog.GetState()), - } - - changeSet, err := differ.Diff(ctx, ingestEntity, "", p.nbClient) - if err != nil { - tags := map[string]string{ - "request_id": ingestEntity.RequestID, - } - contextMap := map[string]any{ - "request_id": ingestEntity.RequestID, - "data_type": ingestEntity.DataType, - } - sentry.CaptureError(err, tags, "Ingest Entity", contextMap) - p.logger.Debug("failed to prepare change set", "ingestionLogID", msg.ingestionLog.GetId(), "error", err) - - msg.errors = append(msg.errors, fmt.Errorf("failed to prepare change set: %v", err)) - - ingestionErr := extractIngestionError(err) - - if err = p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { - msg.errors = append(msg.errors, err) - } - break - } - - msg.changeSet = changeSet - - id, err := p.repository.CreateChangeSet(ctx, *changeSet, msg.ingestionLogID) + id, changeSet, err := generateChangeSet(ctx, msg.ingestionLogID, msg.ingestionLog, "", p.nbClient, p.repository, p.logger) if err != nil { - msg.errors = append(msg.errors, fmt.Errorf("failed to create change set: %v", err)) + p.logger.Error("error generating changeset", "error", err) } - if len(changeSet.ChangeSet) > 0 { + if changeSet != nil && len(changeSet.ChangeSet) > 0 { if applyChangeSetChan != nil { applyChangeSetChan <- IngestionLogToProcess{ ingestionLogID: msg.ingestionLogID, ingestionLog: msg.ingestionLog, changeSetID: *id, - changeSet: msg.changeSet, + changeSet: changeSet, } } - } else { - if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil { - msg.errors = append(msg.errors, err) - } } - p.logger.Debug("change set generated", "id", id, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID) } } }() @@ -342,10 +305,9 @@ func (p *IngestionProcessor) ApplyChangeSet(ctx context.Context, applyChan <-cha return } - if err := applier.ApplyChangeSet(ctx, p.logger, *msg.changeSet, "", p.nbClient); err != nil { + if err := applier.ApplyChangeSet(ctx, p.logger, *msg.changeSet, p.nbClient); err != nil { p.logger.Debug("failed to apply change set", "id", msg.changeSetID, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID, "error", err) msg.errors = append(msg.errors, fmt.Errorf("failed to apply change set: %v", err)) - ingestionErr := extractIngestionError(err) if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go index 176915e0..95ce784f 100644 --- a/diode-server/reconciler/mocks/repository.go +++ b/diode-server/reconciler/mocks/repository.go @@ -203,6 +203,74 @@ func (_c *Repository_CreateIngestionLog_Call) RunAndReturn(run func(context.Cont return _c } +// RetrieveIngestionLogByExternalID provides a mock function with given fields: ctx, uuid +func (_m *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) { + ret := _m.Called(ctx, uuid) + + if len(ret) == 0 { + panic("no return value specified for RetrieveIngestionLogByExternalID") + } + + var r0 *int32 + var r1 *reconcilerpb.IngestionLog + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*int32, *reconcilerpb.IngestionLog, error)); ok { + return rf(ctx, uuid) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *int32); ok { + r0 = rf(ctx, uuid) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*int32) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) *reconcilerpb.IngestionLog); ok { + r1 = rf(ctx, uuid) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*reconcilerpb.IngestionLog) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, uuid) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// Repository_RetrieveIngestionLogByExternalID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveIngestionLogByExternalID' +type Repository_RetrieveIngestionLogByExternalID_Call struct { + *mock.Call +} + +// RetrieveIngestionLogByExternalID is a helper method to define mock.On call +// - ctx context.Context +// - uuid string +func (_e *Repository_Expecter) RetrieveIngestionLogByExternalID(ctx interface{}, uuid interface{}) *Repository_RetrieveIngestionLogByExternalID_Call { + return &Repository_RetrieveIngestionLogByExternalID_Call{Call: _e.mock.On("RetrieveIngestionLogByExternalID", ctx, uuid)} +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) Run(run func(ctx context.Context, uuid string)) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) Return(_a0 *int32, _a1 *reconcilerpb.IngestionLog, _a2 error) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) RunAndReturn(run func(context.Context, string) (*int32, *reconcilerpb.IngestionLog, error)) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Return(run) + return _c +} + // RetrieveIngestionLogs provides a mock function with given fields: ctx, filter, limit, offset func (_m *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) { ret := _m.Called(ctx, filter, limit, offset) diff --git a/diode-server/reconciler/repository.go b/diode-server/reconciler/repository.go index db1fb611..4389db67 100644 --- a/diode-server/reconciler/repository.go +++ b/diode-server/reconciler/repository.go @@ -11,6 +11,7 @@ import ( type Repository interface { CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error + RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) CountIngestionLogsPerState(ctx context.Context) (map[reconcilerpb.State]int32, error) CreateChangeSet(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32) (*int32, error)