diff --git a/diode-proto/diode/v1/reconciler.proto b/diode-proto/diode/v1/reconciler.proto index e8cba9fa..9ff0a148 100644 --- a/diode-proto/diode/v1/reconciler.proto +++ b/diode-proto/diode/v1/reconciler.proto @@ -127,13 +127,7 @@ message ActionIngestionLogRequest { // The response from the ActionIngestionLog request message ActionIngestionLogResponse { - message Error { - string message = 1; - int32 code = 2; - } - IngestionLog log = 1; - repeated Error errors = 2; } // Reconciler service API diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index dd5a2cd2..5dbf6bd5 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -14,6 +14,7 @@ import ( "github.com/netboxlabs/diode/diode-server/dbstore/postgres" "github.com/netboxlabs/diode/diode-server/migrator" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler" "github.com/netboxlabs/diode/diode-server/server" ) @@ -44,6 +45,11 @@ func main() { defer dbPool.Close() repository := postgres.NewRepository(dbPool) + nbClient, err := netboxdiodeplugin.NewClient(s.Logger(), cfg.DiodeToNetBoxAPIKey) + if err != nil { + s.Logger().Error("failed to create netbox diode plugin client", "error", err) + os.Exit(1) + } ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), repository) if err != nil { @@ -56,7 +62,7 @@ func main() { os.Exit(1) } - gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository) + gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), nbClient, repository) if err != nil { s.Logger().Error("failed to instantiate gRPC server", "error", err) os.Exit(1) diff --git a/diode-server/dbstore/postgres/queries/ingestion_logs.sql b/diode-server/dbstore/postgres/queries/ingestion_logs.sql index 9f8334ac..47801b0d 100644 --- a/diode-server/dbstore/postgres/queries/ingestion_logs.sql +++ b/diode-server/dbstore/postgres/queries/ingestion_logs.sql @@ -26,6 +26,11 @@ WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL) ORDER BY id DESC LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); +-- name: RetrieveIngestionLogByExternalID :one +SELECT * +FROM ingestion_logs +WHERE external_id = $1; + -- name: RetrieveIngestionLogsWithChangeSets :many SELECT v_ingestion_logs_with_change_set.* FROM v_ingestion_logs_with_change_set diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go index adb4ec07..16ef181f 100644 --- a/diode-server/dbstore/postgres/repository.go +++ b/diode-server/dbstore/postgres/repository.go @@ -3,10 +3,14 @@ package postgres import ( "context" "encoding/json" + "errors" "fmt" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" "github.com/netboxlabs/diode/diode-server/gen/dbstore/postgres" @@ -56,6 +60,59 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon return &createdIngestionLog.ID, nil } +// RetrieveIngestionLogByExternalID looks up an ingestion log using its external identifier (uuid) +func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) { + ingestionLog, err := r.queries.RetrieveIngestionLogByExternalID(ctx, uuid) + if err != nil { + return nil, nil, mapRepositoryError(err) + } + log, err := pgToProtoIngestionLog(ingestionLog) + if err != nil { + return nil, nil, mapRepositoryError(err) + } + return &ingestionLog.ID, log, nil +} + +func mapRepositoryError(err error) error { + if err == nil { + return nil + } + if errors.Is(err, pgx.ErrNoRows) { + return fmt.Errorf("%w: %w", status.Errorf(codes.NotFound, "not found"), err) + } + // TODO(ltucker): pack appropriate error details ... + return err +} + +func pgToProtoIngestionLog(ingestionLog postgres.IngestionLog) (*reconcilerpb.IngestionLog, error) { + entity := &diodepb.Entity{} + if err := protojson.Unmarshal(ingestionLog.Entity, entity); err != nil { + return nil, fmt.Errorf("failed to unmarshal entity: %w", err) + } + var ingestionErr reconcilerpb.IngestionError + if ingestionLog.Error != nil { + if err := protojson.Unmarshal(ingestionLog.Error, &ingestionErr); err != nil { + return nil, fmt.Errorf("failed to unmarshal error: %w", err) + } + } + + log := &reconcilerpb.IngestionLog{ + Id: ingestionLog.ExternalID, + DataType: ingestionLog.DataType.String, + State: reconcilerpb.State(ingestionLog.State.Int32), + RequestId: ingestionLog.RequestID.String, + IngestionTs: ingestionLog.IngestionTs.Int64, + ProducerAppName: ingestionLog.ProducerAppName.String, + ProducerAppVersion: ingestionLog.ProducerAppVersion.String, + SdkName: ingestionLog.SdkName.String, + SdkVersion: ingestionLog.SdkVersion.String, + Entity: entity, + Error: &ingestionErr, + } + + return log, nil +} + // UpdateIngestionLogStateWithError updates an ingestion log with a new state and error. func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error { params := postgres.UpdateIngestionLogStateWithErrorParams{ diff --git a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go index da019b51..50ef4c8b 100644 --- a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go +++ b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go @@ -98,6 +98,35 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog return i, err } +const retrieveIngestionLogByExternalID = `-- name: RetrieveIngestionLogByExternalID :one +SELECT id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +FROM ingestion_logs +WHERE external_id = $1 +` + +func (q *Queries) RetrieveIngestionLogByExternalID(ctx context.Context, externalID string) (IngestionLog, error) { + row := q.db.QueryRow(ctx, retrieveIngestionLogByExternalID, externalID) + var i IngestionLog + err := row.Scan( + &i.ID, + &i.ExternalID, + &i.DataType, + &i.State, + &i.RequestID, + &i.IngestionTs, + &i.ProducerAppName, + &i.ProducerAppVersion, + &i.SdkName, + &i.SdkVersion, + &i.Entity, + &i.Error, + &i.SourceMetadata, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + const retrieveIngestionLogs = `-- name: RetrieveIngestionLogs :many SELECT id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at FROM ingestion_logs diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go index 053f4091..9a62a25d 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go @@ -437,8 +437,9 @@ type ChangeSet struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // A change set ID - Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // Binary data representing the change set + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // A change set ID + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // Binary data representing the change set + BranchId *string `protobuf:"bytes,3,opt,name=branch_id,json=branchId,proto3,oneof" json:"branch_id,omitempty"` // Branch that change set was generated against } func (x *ChangeSet) Reset() { @@ -485,6 +486,13 @@ func (x *ChangeSet) GetData() []byte { return nil } +func (x *ChangeSet) GetBranchId() string { + if x != nil && x.BranchId != nil { + return *x.BranchId + } + return "" +} + // An ingestion log type IngestionLog struct { state protoimpl.MessageState @@ -851,8 +859,7 @@ type ActionIngestionLogResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Log *IngestionLog `protobuf:"bytes,1,opt,name=log,proto3" json:"log,omitempty"` - Errors []*ActionIngestionLogResponse_Error `protobuf:"bytes,2,rep,name=errors,proto3" json:"errors,omitempty"` + Log *IngestionLog `protobuf:"bytes,1,opt,name=log,proto3" json:"log,omitempty"` } func (x *ActionIngestionLogResponse) Reset() { @@ -892,13 +899,6 @@ func (x *ActionIngestionLogResponse) GetLog() *IngestionLog { return nil } -func (x *ActionIngestionLogResponse) GetErrors() []*ActionIngestionLogResponse_Error { - if x != nil { - return x.Errors - } - return nil -} - type IngestionError_Details struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1013,59 +1013,6 @@ func (x *IngestionError_Details_Error) GetChangeId() string { return "" } -type ActionIngestionLogResponse_Error struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` - Code int32 `protobuf:"varint,2,opt,name=code,proto3" json:"code,omitempty"` -} - -func (x *ActionIngestionLogResponse_Error) Reset() { - *x = ActionIngestionLogResponse_Error{} - mi := &file_diode_v1_reconciler_proto_msgTypes[13] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ActionIngestionLogResponse_Error) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ActionIngestionLogResponse_Error) ProtoMessage() {} - -func (x *ActionIngestionLogResponse_Error) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[13] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ActionIngestionLogResponse_Error.ProtoReflect.Descriptor instead. -func (*ActionIngestionLogResponse_Error) Descriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{10, 0} -} - -func (x *ActionIngestionLogResponse_Error) GetMessage() string { - if x != nil { - return x.Message - } - return "" -} - -func (x *ActionIngestionLogResponse_Error) GetCode() int32 { - if x != nil { - return x.Code - } - return 0 -} - var File_diode_v1_reconciler_proto protoreflect.FileDescriptor var file_diode_v1_reconciler_proto_rawDesc = []byte{ @@ -1128,10 +1075,13 @@ var file_diode_v1_reconciler_proto_rawDesc = []byte{ 0x64, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x6f, 0x5f, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6e, - 0x6f, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x22, 0x2f, 0x0a, 0x09, 0x43, 0x68, 0x61, 0x6e, + 0x6f, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x22, 0x5f, 0x0a, 0x09, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0xcc, 0x03, 0x0a, 0x0c, 0x49, 0x6e, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x09, 0x62, 0x72, 0x61, + 0x6e, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, + 0x62, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, + 0x62, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x22, 0xcc, 0x03, 0x0a, 0x0c, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x64, @@ -1203,63 +1153,55 @@ var file_diode_v1_reconciler_proto_rawDesc = []byte{ 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x09, 0x62, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x62, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x62, 0x72, - 0x61, 0x6e, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x22, 0xc1, 0x01, 0x0a, 0x1a, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x03, 0x6c, 0x6f, 0x67, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, - 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x03, 0x6c, 0x6f, 0x67, - 0x12, 0x42, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x2a, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x06, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x73, 0x1a, 0x35, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x2a, 0x50, 0x0a, 0x05, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, - 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x51, 0x55, 0x45, 0x55, 0x45, 0x44, 0x10, - 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x43, 0x4f, 0x4e, 0x43, 0x49, 0x4c, 0x45, 0x44, 0x10, - 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0e, 0x0a, - 0x0a, 0x4e, 0x4f, 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x53, 0x10, 0x04, 0x2a, 0x5a, 0x0a, - 0x0a, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x12, 0x41, - 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, - 0x44, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x44, 0x49, - 0x46, 0x46, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, - 0x50, 0x50, 0x4c, 0x59, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, - 0x5f, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x32, 0xdf, 0x02, 0x0a, 0x11, 0x52, 0x65, - 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, - 0x7f, 0x0a, 0x1c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, - 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, - 0x2d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, - 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, - 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, - 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, - 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x68, 0x0a, 0x15, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x26, 0x2e, 0x64, 0x69, 0x6f, 0x64, - 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, - 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x27, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, - 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, - 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5f, 0x0a, 0x12, 0x41, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, - 0x12, 0x23, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, - 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0xa4, 0x01, 0x0a, 0x0c, - 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0f, 0x52, 0x65, - 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, - 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x62, - 0x6f, 0x78, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x64, 0x69, 0x6f, - 0x64, 0x65, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x64, 0x69, - 0x6f, 0x64, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, - 0x72, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x44, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, - 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0xe2, - 0x02, 0x14, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x3a, 0x3a, - 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x6e, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x22, 0x46, 0x0a, 0x1a, 0x41, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x03, 0x6c, 0x6f, 0x67, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, + 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x03, 0x6c, 0x6f, 0x67, 0x2a, + 0x50, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, + 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x51, 0x55, 0x45, + 0x55, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x43, 0x4f, 0x4e, 0x43, 0x49, + 0x4c, 0x45, 0x44, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, + 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x4f, 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x53, 0x10, + 0x04, 0x2a, 0x5a, 0x0a, 0x0a, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x16, 0x0a, 0x12, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, + 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x41, 0x43, 0x54, 0x49, 0x4f, + 0x4e, 0x5f, 0x44, 0x49, 0x46, 0x46, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x41, 0x43, 0x54, 0x49, + 0x4f, 0x4e, 0x5f, 0x41, 0x50, 0x50, 0x4c, 0x59, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, + 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x32, 0xdf, 0x02, + 0x0a, 0x11, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x7f, 0x0a, 0x1c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x73, 0x12, 0x2d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, + 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, + 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, + 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, + 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x68, 0x0a, 0x15, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, + 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x26, 0x2e, + 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, + 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, + 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5f, + 0x0a, 0x12, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, + 0x6e, 0x4c, 0x6f, 0x67, 0x12, 0x23, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, + 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x64, 0x69, 0x6f, 0x64, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x67, 0x65, 0x73, + 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, + 0xa4, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, + 0x42, 0x0f, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x50, 0x01, 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x6e, 0x65, 0x74, 0x62, 0x6f, 0x78, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, + 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x67, 0x65, + 0x6e, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x65, 0x63, 0x6f, 0x6e, + 0x63, 0x69, 0x6c, 0x65, 0x72, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x44, 0x58, 0x58, 0xaa, 0x02, 0x08, + 0x44, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, 0x65, + 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, + 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x44, 0x69, 0x6f, + 0x64, 0x65, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1275,7 +1217,7 @@ func file_diode_v1_reconciler_proto_rawDescGZIP() []byte { } var file_diode_v1_reconciler_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_diode_v1_reconciler_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_diode_v1_reconciler_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_diode_v1_reconciler_proto_goTypes = []any{ (State)(0), // 0: diode.v1.State (ActionType)(0), // 1: diode.v1.ActionType @@ -1292,14 +1234,13 @@ var file_diode_v1_reconciler_proto_goTypes = []any{ (*ActionIngestionLogResponse)(nil), // 12: diode.v1.ActionIngestionLogResponse (*IngestionError_Details)(nil), // 13: diode.v1.IngestionError.Details (*IngestionError_Details_Error)(nil), // 14: diode.v1.IngestionError.Details.Error - (*ActionIngestionLogResponse_Error)(nil), // 15: diode.v1.ActionIngestionLogResponse.Error - (*diodepb.Entity)(nil), // 16: diode.v1.Entity + (*diodepb.Entity)(nil), // 15: diode.v1.Entity } var file_diode_v1_reconciler_proto_depIdxs = []int32{ 2, // 0: diode.v1.RetrieveIngestionDataSourcesResponse.ingestion_data_sources:type_name -> diode.v1.IngestionDataSource 13, // 1: diode.v1.IngestionError.details:type_name -> diode.v1.IngestionError.Details 0, // 2: diode.v1.IngestionLog.state:type_name -> diode.v1.State - 16, // 3: diode.v1.IngestionLog.entity:type_name -> diode.v1.Entity + 15, // 3: diode.v1.IngestionLog.entity:type_name -> diode.v1.Entity 5, // 4: diode.v1.IngestionLog.error:type_name -> diode.v1.IngestionError 7, // 5: diode.v1.IngestionLog.change_set:type_name -> diode.v1.ChangeSet 0, // 6: diode.v1.RetrieveIngestionLogsRequest.state:type_name -> diode.v1.State @@ -1307,19 +1248,18 @@ var file_diode_v1_reconciler_proto_depIdxs = []int32{ 6, // 8: diode.v1.RetrieveIngestionLogsResponse.metrics:type_name -> diode.v1.IngestionMetrics 1, // 9: diode.v1.ActionIngestionLogRequest.action:type_name -> diode.v1.ActionType 8, // 10: diode.v1.ActionIngestionLogResponse.log:type_name -> diode.v1.IngestionLog - 15, // 11: diode.v1.ActionIngestionLogResponse.errors:type_name -> diode.v1.ActionIngestionLogResponse.Error - 14, // 12: diode.v1.IngestionError.Details.errors:type_name -> diode.v1.IngestionError.Details.Error - 3, // 13: diode.v1.ReconcilerService.RetrieveIngestionDataSources:input_type -> diode.v1.RetrieveIngestionDataSourcesRequest - 9, // 14: diode.v1.ReconcilerService.RetrieveIngestionLogs:input_type -> diode.v1.RetrieveIngestionLogsRequest - 11, // 15: diode.v1.ReconcilerService.ActionIngestionLog:input_type -> diode.v1.ActionIngestionLogRequest - 4, // 16: diode.v1.ReconcilerService.RetrieveIngestionDataSources:output_type -> diode.v1.RetrieveIngestionDataSourcesResponse - 10, // 17: diode.v1.ReconcilerService.RetrieveIngestionLogs:output_type -> diode.v1.RetrieveIngestionLogsResponse - 12, // 18: diode.v1.ReconcilerService.ActionIngestionLog:output_type -> diode.v1.ActionIngestionLogResponse - 16, // [16:19] is the sub-list for method output_type - 13, // [13:16] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 14, // 11: diode.v1.IngestionError.Details.errors:type_name -> diode.v1.IngestionError.Details.Error + 3, // 12: diode.v1.ReconcilerService.RetrieveIngestionDataSources:input_type -> diode.v1.RetrieveIngestionDataSourcesRequest + 9, // 13: diode.v1.ReconcilerService.RetrieveIngestionLogs:input_type -> diode.v1.RetrieveIngestionLogsRequest + 11, // 14: diode.v1.ReconcilerService.ActionIngestionLog:input_type -> diode.v1.ActionIngestionLogRequest + 4, // 15: diode.v1.ReconcilerService.RetrieveIngestionDataSources:output_type -> diode.v1.RetrieveIngestionDataSourcesResponse + 10, // 16: diode.v1.ReconcilerService.RetrieveIngestionLogs:output_type -> diode.v1.RetrieveIngestionLogsResponse + 12, // 17: diode.v1.ReconcilerService.ActionIngestionLog:output_type -> diode.v1.ActionIngestionLogResponse + 15, // [15:18] is the sub-list for method output_type + 12, // [12:15] is the sub-list for method input_type + 12, // [12:12] is the sub-list for extension type_name + 12, // [12:12] is the sub-list for extension extendee + 0, // [0:12] is the sub-list for field type_name } func init() { file_diode_v1_reconciler_proto_init() } @@ -1327,6 +1267,7 @@ func file_diode_v1_reconciler_proto_init() { if File_diode_v1_reconciler_proto != nil { return } + file_diode_v1_reconciler_proto_msgTypes[5].OneofWrappers = []any{} file_diode_v1_reconciler_proto_msgTypes[7].OneofWrappers = []any{} file_diode_v1_reconciler_proto_msgTypes[9].OneofWrappers = []any{} type x struct{} @@ -1335,7 +1276,7 @@ func file_diode_v1_reconciler_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_diode_v1_reconciler_proto_rawDesc, NumEnums: 2, - NumMessages: 14, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go index 79187394..eaadc699 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go @@ -708,6 +708,10 @@ func (m *ChangeSet) validate(all bool) error { // no validation rules for Data + if m.BranchId != nil { + // no validation rules for BranchId + } + if len(errors) > 0 { return ChangeSetMultiError(errors) } @@ -1441,40 +1445,6 @@ func (m *ActionIngestionLogResponse) validate(all bool) error { } } - for idx, item := range m.GetErrors() { - _, _ = idx, item - - if all { - switch v := interface{}(item).(type) { - case interface{ ValidateAll() error }: - if err := v.ValidateAll(); err != nil { - errors = append(errors, ActionIngestionLogResponseValidationError{ - field: fmt.Sprintf("Errors[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } - case interface{ Validate() error }: - if err := v.Validate(); err != nil { - errors = append(errors, ActionIngestionLogResponseValidationError{ - field: fmt.Sprintf("Errors[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } - } - } else if v, ok := interface{}(item).(interface{ Validate() error }); ok { - if err := v.Validate(); err != nil { - return ActionIngestionLogResponseValidationError{ - field: fmt.Sprintf("Errors[%v]", idx), - reason: "embedded message failed validation", - cause: err, - } - } - } - - } - if len(errors) > 0 { return ActionIngestionLogResponseMultiError(errors) } @@ -1801,112 +1771,3 @@ var _ interface { Cause() error ErrorName() string } = IngestionError_Details_ErrorValidationError{} - -// Validate checks the field values on ActionIngestionLogResponse_Error with -// the rules defined in the proto definition for this message. If any rules -// are violated, the first error encountered is returned, or nil if there are -// no violations. -func (m *ActionIngestionLogResponse_Error) Validate() error { - return m.validate(false) -} - -// ValidateAll checks the field values on ActionIngestionLogResponse_Error with -// the rules defined in the proto definition for this message. If any rules -// are violated, the result is a list of violation errors wrapped in -// ActionIngestionLogResponse_ErrorMultiError, or nil if none found. -func (m *ActionIngestionLogResponse_Error) ValidateAll() error { - return m.validate(true) -} - -func (m *ActionIngestionLogResponse_Error) validate(all bool) error { - if m == nil { - return nil - } - - var errors []error - - // no validation rules for Message - - // no validation rules for Code - - if len(errors) > 0 { - return ActionIngestionLogResponse_ErrorMultiError(errors) - } - - return nil -} - -// ActionIngestionLogResponse_ErrorMultiError is an error wrapping multiple -// validation errors returned by -// ActionIngestionLogResponse_Error.ValidateAll() if the designated -// constraints aren't met. -type ActionIngestionLogResponse_ErrorMultiError []error - -// Error returns a concatenation of all the error messages it wraps. -func (m ActionIngestionLogResponse_ErrorMultiError) Error() string { - var msgs []string - for _, err := range m { - msgs = append(msgs, err.Error()) - } - return strings.Join(msgs, "; ") -} - -// AllErrors returns a list of validation violation errors. -func (m ActionIngestionLogResponse_ErrorMultiError) AllErrors() []error { return m } - -// ActionIngestionLogResponse_ErrorValidationError is the validation error -// returned by ActionIngestionLogResponse_Error.Validate if the designated -// constraints aren't met. -type ActionIngestionLogResponse_ErrorValidationError struct { - field string - reason string - cause error - key bool -} - -// Field function returns field value. -func (e ActionIngestionLogResponse_ErrorValidationError) Field() string { return e.field } - -// Reason function returns reason value. -func (e ActionIngestionLogResponse_ErrorValidationError) Reason() string { return e.reason } - -// Cause function returns cause value. -func (e ActionIngestionLogResponse_ErrorValidationError) Cause() error { return e.cause } - -// Key function returns key value. -func (e ActionIngestionLogResponse_ErrorValidationError) Key() bool { return e.key } - -// ErrorName returns error name. -func (e ActionIngestionLogResponse_ErrorValidationError) ErrorName() string { - return "ActionIngestionLogResponse_ErrorValidationError" -} - -// Error satisfies the builtin error interface -func (e ActionIngestionLogResponse_ErrorValidationError) Error() string { - cause := "" - if e.cause != nil { - cause = fmt.Sprintf(" | caused by: %v", e.cause) - } - - key := "" - if e.key { - key = "key for " - } - - return fmt.Sprintf( - "invalid %sActionIngestionLogResponse_Error.%s: %s%s", - key, - e.field, - e.reason, - cause) -} - -var _ error = ActionIngestionLogResponse_ErrorValidationError{} - -var _ interface { - Field() string - Reason() string - Key() bool - Cause() error - ErrorName() string -} = ActionIngestionLogResponse_ErrorValidationError{} diff --git a/diode-server/ingester/component_test.go b/diode-server/ingester/component_test.go index edc8b805..f78858c8 100644 --- a/diode-server/ingester/component_test.go +++ b/diode-server/ingester/component_test.go @@ -17,6 +17,7 @@ import ( pb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" "github.com/netboxlabs/diode/diode-server/ingester" + pluginmocks "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" "github.com/netboxlabs/diode/diode-server/reconciler" "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) @@ -72,7 +73,8 @@ const bufSize = 1024 * 1024 func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) mockRepository := mocks.NewRepository(t) - server, err := reconciler.NewServer(ctx, logger, mockRepository) + nbClientMock := pluginmocks.NewNetBoxAPI(t) + server, err := reconciler.NewServer(ctx, logger, nbClientMock, mockRepository) require.NoError(t, err) errChan := make(chan error, 1) diff --git a/diode-server/netboxdiodeplugin/mocks/netboxapi.go b/diode-server/netboxdiodeplugin/mocks/netboxapi.go index 48b610a1..fb7ab9c2 100644 --- a/diode-server/netboxdiodeplugin/mocks/netboxapi.go +++ b/diode-server/netboxdiodeplugin/mocks/netboxapi.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.49.1. DO NOT EDIT. package mocks diff --git a/diode-server/reconciler/applier/applier.go b/diode-server/reconciler/applier/applier.go index e3aa71aa..439743ef 100644 --- a/diode-server/reconciler/applier/applier.go +++ b/diode-server/reconciler/applier/applier.go @@ -9,7 +9,7 @@ import ( ) // ApplyChangeSet applies a change set to NetBox -func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, branchID string, nbClient netboxdiodeplugin.NetBoxAPI) error { +func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, nbClient netboxdiodeplugin.NetBoxAPI) error { changes := make([]netboxdiodeplugin.Change, 0) for _, change := range cs.ChangeSet { changes = append(changes, netboxdiodeplugin.Change{ @@ -25,8 +25,9 @@ func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.Chang req := netboxdiodeplugin.ChangeSetRequest{ ChangeSetID: cs.ChangeSetID, ChangeSet: changes, - // TODO(mfiedorowicz): take branch from ChangeSet, remove parameter - BranchID: branchID, + } + if cs.BranchID != nil { + req.BranchID = *cs.BranchID } resp, err := nbClient.ApplyChangeSet(ctx, req) diff --git a/diode-server/reconciler/applier/applier_test.go b/diode-server/reconciler/applier/applier_test.go index caf9683f..d70c5893 100644 --- a/diode-server/reconciler/applier/applier_test.go +++ b/diode-server/reconciler/applier/applier_test.go @@ -61,7 +61,7 @@ func TestApplyChangeSet(t *testing.T) { mockNetBoxAPI.On("ApplyChangeSet", ctx, req).Return(resp, nil) - err := applier.ApplyChangeSet(ctx, logger, cs, "", mockNetBoxAPI) + err := applier.ApplyChangeSet(ctx, logger, cs, mockNetBoxAPI) assert.NoError(t, err) mockNetBoxAPI.AssertExpectations(t) } diff --git a/diode-server/reconciler/differ/differ.go b/diode-server/reconciler/differ/differ.go index e7f00ade..c35b5327 100644 --- a/diode-server/reconciler/differ/differ.go +++ b/diode-server/reconciler/differ/differ.go @@ -93,11 +93,14 @@ func Diff(ctx context.Context, entity IngestEntity, branchID string, netboxAPI n ObjectID: objectID, ObjectVersion: nil, Data: obj.Data(), - // TODO(mfiedorowicz): include branchID }) } - return &changeset.ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes}, nil + cs := &changeset.ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes} + if branchID != "" { + cs.BranchID = &branchID + } + return cs, nil } func retrieveObjectState(ctx context.Context, netboxAPI netboxdiodeplugin.NetBoxAPI, change netbox.ComparableData, branchID string) (netbox.ComparableData, error) { diff --git a/diode-server/reconciler/generate_change_set.go b/diode-server/reconciler/generate_change_set.go new file mode 100644 index 00000000..0ca6b5db --- /dev/null +++ b/diode-server/reconciler/generate_change_set.go @@ -0,0 +1,56 @@ +package reconciler + +import ( + "context" + "errors" + "log/slog" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/differ" + "github.com/netboxlabs/diode/diode-server/sentry" +) + +func generateChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, branchID string, nbClient netboxdiodeplugin.NetBoxAPI, repository Repository, logger *slog.Logger) (*int32, *changeset.ChangeSet, error) { + ingestEntity := differ.IngestEntity{ + RequestID: ingestionLog.GetId(), // ???(ltucker): GetRequestId() ? + DataType: ingestionLog.GetDataType(), + Entity: ingestionLog.GetEntity(), + State: int(ingestionLog.GetState()), + } + + changeSet, err := differ.Diff(ctx, ingestEntity, branchID, nbClient) + if err != nil { + tags := map[string]string{ + "request_id": ingestEntity.RequestID, + } + contextMap := map[string]any{ + "request_id": ingestEntity.RequestID, + "data_type": ingestEntity.DataType, + } + sentry.CaptureError(err, tags, "Ingest Entity", contextMap) + ingestionErr := extractIngestionError(err) + + if err2 := repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err2 != nil { + err = errors.Join(err, err2) + } + return nil, nil, err + } + + changeSetID, err := repository.CreateChangeSet(ctx, *changeSet, ingestionLogID) + if err != nil { + return nil, nil, err + } + + if len(changeSet.ChangeSet) == 0 { + if err := repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil { + logger.Warn("failed to update ingestion log state (ignored)", "ingestionLogID", ingestionLog.GetId(), "error", err) + // TODO(ltucker): This should be in a transaction. Can leave an inconsistent state marked on the ingestion log. + // return nil, err + } + } + + logger.Debug("change set generated", "id", changeSetID, "externalID", changeSet.ChangeSetID, "ingestionLogID", ingestionLog.GetId()) + return changeSetID, changeSet, nil +} diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 09a8914b..f484fd49 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -20,7 +20,6 @@ import ( "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler/applier" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" - "github.com/netboxlabs/diode/diode-server/reconciler/differ" "github.com/netboxlabs/diode/diode-server/sentry" ) @@ -261,57 +260,22 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan return } - ingestEntity := differ.IngestEntity{ - RequestID: msg.ingestionLog.GetId(), - DataType: msg.ingestionLog.GetDataType(), - Entity: msg.ingestionLog.GetEntity(), - State: int(msg.ingestionLog.GetState()), - } - - changeSet, err := differ.Diff(ctx, ingestEntity, "", p.nbClient) - if err != nil { - tags := map[string]string{ - "request_id": ingestEntity.RequestID, - } - contextMap := map[string]any{ - "request_id": ingestEntity.RequestID, - "data_type": ingestEntity.DataType, - } - sentry.CaptureError(err, tags, "Ingest Entity", contextMap) - p.logger.Debug("failed to prepare change set", "ingestionLogID", msg.ingestionLog.GetId(), "error", err) - - msg.errors = append(msg.errors, fmt.Errorf("failed to prepare change set: %v", err)) - - ingestionErr := extractIngestionError(err) - - if err = p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { - msg.errors = append(msg.errors, err) - } - break - } - - msg.changeSet = changeSet - - id, err := p.repository.CreateChangeSet(ctx, *changeSet, msg.ingestionLogID) + id, changeSet, err := generateChangeSet(ctx, msg.ingestionLogID, msg.ingestionLog, "", p.nbClient, p.repository, p.logger) if err != nil { - msg.errors = append(msg.errors, fmt.Errorf("failed to create change set: %v", err)) + p.logger.Error("error generating changeset", "error", err) + continue } - if len(changeSet.ChangeSet) > 0 { + if changeSet != nil && len(changeSet.ChangeSet) > 0 { if applyChangeSetChan != nil { applyChangeSetChan <- IngestionLogToProcess{ ingestionLogID: msg.ingestionLogID, ingestionLog: msg.ingestionLog, changeSetID: *id, - changeSet: msg.changeSet, + changeSet: changeSet, } } - } else { - if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil { - msg.errors = append(msg.errors, err) - } } - p.logger.Debug("change set generated", "id", id, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID) } } }() @@ -342,10 +306,9 @@ func (p *IngestionProcessor) ApplyChangeSet(ctx context.Context, applyChan <-cha return } - if err := applier.ApplyChangeSet(ctx, p.logger, *msg.changeSet, "", p.nbClient); err != nil { + if err := applier.ApplyChangeSet(ctx, p.logger, *msg.changeSet, p.nbClient); err != nil { p.logger.Debug("failed to apply change set", "id", msg.changeSetID, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID, "error", err) msg.errors = append(msg.errors, fmt.Errorf("failed to apply change set: %v", err)) - ingestionErr := extractIngestionError(err) if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { diff --git a/diode-server/reconciler/mocks/client.go b/diode-server/reconciler/mocks/client.go index f2c2e177..842ecd68 100644 --- a/diode-server/reconciler/mocks/client.go +++ b/diode-server/reconciler/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.49.1. DO NOT EDIT. package mocks @@ -25,7 +25,7 @@ func (_m *Client) EXPECT() *Client_Expecter { return &Client_Expecter{mock: &_m.Mock} } -// Close provides a mock function with no fields +// Close provides a mock function with given fields: func (_m *Client) Close() error { ret := _m.Called() diff --git a/diode-server/reconciler/mocks/redisclient.go b/diode-server/reconciler/mocks/redisclient.go index e2ede4a9..74586f30 100644 --- a/diode-server/reconciler/mocks/redisclient.go +++ b/diode-server/reconciler/mocks/redisclient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.49.1. DO NOT EDIT. package mocks @@ -23,7 +23,7 @@ func (_m *RedisClient) EXPECT() *RedisClient_Expecter { return &RedisClient_Expecter{mock: &_m.Mock} } -// Close provides a mock function with no fields +// Close provides a mock function with given fields: func (_m *RedisClient) Close() error { ret := _m.Called() @@ -238,7 +238,7 @@ func (_c *RedisClient_Ping_Call) RunAndReturn(run func(context.Context) *redis.S return _c } -// Pipeline provides a mock function with no fields +// Pipeline provides a mock function with given fields: func (_m *RedisClient) Pipeline() redis.Pipeliner { ret := _m.Called() diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go index 176915e0..1376cb54 100644 --- a/diode-server/reconciler/mocks/repository.go +++ b/diode-server/reconciler/mocks/repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.49.1. DO NOT EDIT. package mocks @@ -203,6 +203,74 @@ func (_c *Repository_CreateIngestionLog_Call) RunAndReturn(run func(context.Cont return _c } +// RetrieveIngestionLogByExternalID provides a mock function with given fields: ctx, uuid +func (_m *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) { + ret := _m.Called(ctx, uuid) + + if len(ret) == 0 { + panic("no return value specified for RetrieveIngestionLogByExternalID") + } + + var r0 *int32 + var r1 *reconcilerpb.IngestionLog + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*int32, *reconcilerpb.IngestionLog, error)); ok { + return rf(ctx, uuid) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *int32); ok { + r0 = rf(ctx, uuid) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*int32) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) *reconcilerpb.IngestionLog); ok { + r1 = rf(ctx, uuid) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*reconcilerpb.IngestionLog) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, uuid) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// Repository_RetrieveIngestionLogByExternalID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveIngestionLogByExternalID' +type Repository_RetrieveIngestionLogByExternalID_Call struct { + *mock.Call +} + +// RetrieveIngestionLogByExternalID is a helper method to define mock.On call +// - ctx context.Context +// - uuid string +func (_e *Repository_Expecter) RetrieveIngestionLogByExternalID(ctx interface{}, uuid interface{}) *Repository_RetrieveIngestionLogByExternalID_Call { + return &Repository_RetrieveIngestionLogByExternalID_Call{Call: _e.mock.On("RetrieveIngestionLogByExternalID", ctx, uuid)} +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) Run(run func(ctx context.Context, uuid string)) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) Return(_a0 *int32, _a1 *reconcilerpb.IngestionLog, _a2 error) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *Repository_RetrieveIngestionLogByExternalID_Call) RunAndReturn(run func(context.Context, string) (*int32, *reconcilerpb.IngestionLog, error)) *Repository_RetrieveIngestionLogByExternalID_Call { + _c.Call.Return(run) + return _c +} + // RetrieveIngestionLogs provides a mock function with given fields: ctx, filter, limit, offset func (_m *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) { ret := _m.Called(ctx, filter, limit, offset) diff --git a/diode-server/reconciler/repository.go b/diode-server/reconciler/repository.go index db1fb611..4389db67 100644 --- a/diode-server/reconciler/repository.go +++ b/diode-server/reconciler/repository.go @@ -11,6 +11,7 @@ import ( type Repository interface { CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error + RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) CountIngestionLogsPerState(ctx context.Context) (map[reconcilerpb.State]int32, error) CreateChangeSet(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32) (*int32, error) diff --git a/diode-server/reconciler/server.go b/diode-server/reconciler/server.go index 84484e8f..95131855 100644 --- a/diode-server/reconciler/server.go +++ b/diode-server/reconciler/server.go @@ -2,6 +2,7 @@ package reconciler import ( "context" + "errors" "fmt" "log/slog" "net" @@ -16,6 +17,8 @@ import ( "google.golang.org/grpc/status" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" ) const ( @@ -37,10 +40,11 @@ type Server struct { redisClient RedisClient repository Repository apiKeys APIKeys + nbClient netboxdiodeplugin.NetBoxAPI } // NewServer creates a new reconciler server -func NewServer(ctx context.Context, logger *slog.Logger, repository Repository) (*Server, error) { +func NewServer(ctx context.Context, logger *slog.Logger, nbClient netboxdiodeplugin.NetBoxAPI, repository Repository) (*Server, error) { var cfg Config envconfig.MustProcess("", &cfg) @@ -75,6 +79,7 @@ func NewServer(ctx context.Context, logger *slog.Logger, repository Repository) redisClient: redisClient, repository: repository, apiKeys: apiKeys, + nbClient: nbClient, } reconcilerpb.RegisterReconcilerServiceServer(grpcServer, component) @@ -115,6 +120,74 @@ func (s *Server) Stop() error { return s.redisClient.Close() } +// ActionIngestionLog takes an action on an IngestionLog +func (s *Server) ActionIngestionLog(ctx context.Context, in *reconcilerpb.ActionIngestionLogRequest) (*reconcilerpb.ActionIngestionLogResponse, error) { + var response *reconcilerpb.ActionIngestionLogResponse + var err error + + switch in.Action { + case reconcilerpb.ActionType_ACTION_DIFF: + response, err = s.actionIngestionLogDiff(ctx, in) + default: + return nil, status.Errorf(codes.Unimplemented, "ActionIngestionLog Action %v not implemented", in.Action) + } + return response, mapToGRPCStatus(err).Err() +} + +func (s *Server) actionIngestionLogDiff(ctx context.Context, in *reconcilerpb.ActionIngestionLogRequest) (*reconcilerpb.ActionIngestionLogResponse, error) { + id, log, err := s.repository.RetrieveIngestionLogByExternalID(ctx, in.IngestionLogId) + if err != nil { + return nil, err + } + + var branchID string + if in.BranchId != nil { + branchID = *in.BranchId + } + + _, changeSet, err := generateChangeSet(ctx, *id, log, branchID, s.nbClient, s.repository, s.logger) + if err != nil { + s.logger.Error("error generating changeset", "error", err) + return nil, err + } + if changeSet == nil { + err = fmt.Errorf("no changeset generated") + s.logger.Error("error generating changeset", "error", err) + return nil, err + } + + log.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID} + if len(changeSet.ChangeSet) > 0 { + b, err := changeset.CompressChangeSet(changeSet) + if err != nil { + err = fmt.Errorf("failed to compress change set: %w", err) + s.logger.Error("error formatting changeset", "error", err) + return nil, err + } + log.ChangeSet.Data = b + } + + return &reconcilerpb.ActionIngestionLogResponse{Log: log}, nil +} + +func mapToGRPCStatus(err error) *status.Status { + mapped, ok := status.FromError(err) + if ok { + return mapped // this is already a known status or nil + } + + // try to identify anything well known that is not automatically handled ... + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return status.FromContextError(err) + } + // ... + + // pack details, codes for some standard error structure here? + // mapped = mapped.WithDetails(...) + + return mapped +} + // RetrieveIngestionDataSources retrieves ingestion data sources func (s *Server) RetrieveIngestionDataSources(_ context.Context, in *reconcilerpb.RetrieveIngestionDataSourcesRequest) (*reconcilerpb.RetrieveIngestionDataSourcesResponse, error) { if err := validateRetrieveIngestionDataSourcesRequest(in); err != nil { diff --git a/diode-server/reconciler/server_test.go b/diode-server/reconciler/server_test.go index fbc1f9e0..1bcce555 100644 --- a/diode-server/reconciler/server_test.go +++ b/diode-server/reconciler/server_test.go @@ -1,7 +1,10 @@ package reconciler_test import ( + "bytes" "context" + "encoding/json" + "io" "log/slog" "net" "os" @@ -9,17 +12,32 @@ import ( "time" "github.com/alicebob/miniredis/v2" + "github.com/andybalholm/brotli" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" pb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/netbox" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + pluginmocks "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) -func startTestServer(ctx context.Context, t *testing.T, redisAddr string) (*reconciler.Server, *grpc.ClientConn) { +type TestServer struct { + *reconciler.Server + mockRepository *mocks.Repository + mockNetBoxClient *pluginmocks.NetBoxAPI +} + +func startTestServer(ctx context.Context, t *testing.T, redisAddr string) (*TestServer, *grpc.ClientConn) { setupEnv(redisAddr) defer teardownEnv() @@ -28,7 +46,8 @@ func startTestServer(ctx context.Context, t *testing.T, redisAddr string) (*reco logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) mockRepository := mocks.NewRepository(t) - server, err := reconciler.NewServer(ctx, logger, mockRepository) + mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) + server, err := reconciler.NewServer(ctx, logger, mockNetBoxClient, mockRepository) require.NoError(t, err) pb.RegisterReconcilerServiceServer(s, server) @@ -50,7 +69,12 @@ func startTestServer(ctx context.Context, t *testing.T, redisAddr string) (*reco default: } - return server, conn + testServer := &TestServer{ + Server: server, + mockRepository: mockRepository, + mockNetBoxClient: mockNetBoxClient, + } + return testServer, conn } func TestNewServer(t *testing.T) { @@ -63,7 +87,8 @@ func TestNewServer(t *testing.T) { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) mockRepository := mocks.NewRepository(t) - server, err := reconciler.NewServer(ctx, logger, mockRepository) + mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) + server, err := reconciler.NewServer(ctx, logger, mockNetBoxClient, mockRepository) require.NoError(t, err) require.NotNil(t, server) @@ -157,3 +182,325 @@ func TestRetrieveIngestionDataSources(t *testing.T) { }) } } + +func decodeChangeSet(t *testing.T, data []byte) changeset.ChangeSet { + t.Helper() + var cs changeset.ChangeSet + if len(data) > 0 { + data, err := io.ReadAll(brotli.NewReader(bytes.NewReader(data))) + require.NoError(t, err) + require.NoError(t, json.Unmarshal(data, &cs)) + } + return cs +} + +func TestActionIngestionLogDiff(t *testing.T) { + type mockRetrieveIngestionLogByExternalID struct { + uuid string + dbid int32 + log *pb.IngestionLog + err error + } + + type mockRetrieveObjectState struct { + objectType string + objectID int + queryParams map[string]string + objectChangeID int + object netbox.ComparableData + } + + type mockCreateChangeSet struct { + ingestionLogDBID int32 + id int32 + } + + type mockUpdateIngestionLogStateWithError struct { + ingestionLogDBID int32 + state pb.State + err error + } + + type changeExpectation struct { + objectType string + changeType string + data map[string]interface{} + } + + tests := []struct { + name string + ingestionLogID string + branchID *string + + retrieveIngestionLogByExternalIDs []mockRetrieveIngestionLogByExternalID + retrieveObjectStates []mockRetrieveObjectState + createChangeSets []mockCreateChangeSet + updateIngestionLogStateWithError []mockUpdateIngestionLogStateWithError + + expectBranchID *string + expectChanges []changeExpectation + errorCode codes.Code + errorMessage string + hasError bool + }{ + { + name: "diff with no existing object", + ingestionLogID: "8a8ae517-85b9-466e-890c-aadb0771cc9e", + retrieveIngestionLogByExternalIDs: []mockRetrieveIngestionLogByExternalID{ + { + uuid: "8a8ae517-85b9-466e-890c-aadb0771cc9e", + dbid: 1234, + log: &pb.IngestionLog{ + Id: "8a8ae517-85b9-466e-890c-aadb0771cc9e", + DataType: netbox.DcimSiteObjectType, + State: pb.State_QUEUED, + RequestId: "1abf059c-496f-4037-83c2-0e9b1d021e85", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-1", + }, + }, + }, + }, + }, + }, + retrieveObjectStates: []mockRetrieveObjectState{ + { + objectType: "dcim.site", + objectID: 0, + queryParams: map[string]string{"q": "test-site-1"}, + objectChangeID: 0, + object: &netbox.DcimSiteDataWrapper{ + Site: nil, + }, + }, + }, + createChangeSets: []mockCreateChangeSet{ + {ingestionLogDBID: 1234, id: 1235}, + }, + + expectChanges: []changeExpectation{ + { + objectType: "dcim.site", + changeType: "create", + data: map[string]interface{}{ + "name": "test-site-1", + "slug": "test-site-1", + "status": "active", + }, + }, + }, + }, + { + name: "diff with no existing object (no change)", + ingestionLogID: "9a48b926-7bc2-46be-a191-db492e76fd57", + retrieveIngestionLogByExternalIDs: []mockRetrieveIngestionLogByExternalID{ + { + uuid: "9a48b926-7bc2-46be-a191-db492e76fd57", + dbid: 1234, + log: &pb.IngestionLog{ + Id: "9a48b926-7bc2-46be-a191-db492e76fd57", + DataType: netbox.DcimSiteObjectType, + State: pb.State_QUEUED, + RequestId: "85bcc7d4-768d-4c93-8da0-03b534155633", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-1", + }, + }, + }, + }, + }, + }, + retrieveObjectStates: []mockRetrieveObjectState{ + { + objectType: "dcim.site", + objectID: 0, + queryParams: map[string]string{"q": "test-site-1"}, + objectChangeID: 0, + object: &netbox.DcimSiteDataWrapper{ + Site: &netbox.DcimSite{ + ID: 1235, + Name: "test-site-1", + Slug: "test-site-1", + }, + }, + }, + }, + createChangeSets: []mockCreateChangeSet{ + {ingestionLogDBID: 1234, id: 1235}, + }, + updateIngestionLogStateWithError: []mockUpdateIngestionLogStateWithError{ + { + ingestionLogDBID: 1234, + state: pb.State_NO_CHANGES, + }, + }, + expectChanges: []changeExpectation{}, + }, + { + name: "diff with existing object update tag", + ingestionLogID: "5b4e7a4d-9726-4787-9c30-0a683b03dc47", + retrieveIngestionLogByExternalIDs: []mockRetrieveIngestionLogByExternalID{ + { + uuid: "5b4e7a4d-9726-4787-9c30-0a683b03dc47", + dbid: 1234, + log: &pb.IngestionLog{ + Id: "5b4e7a4d-9726-4787-9c30-0a683b03dc47", + DataType: netbox.DcimSiteObjectType, + State: pb.State_QUEUED, + RequestId: "75de6a62-3bfd-4eba-9dbe-4d266b4e7b6a", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-1", + Tags: []*diodepb.Tag{ + { + Name: "tag 1", + }, + }, + }, + }, + }, + }, + }, + }, + retrieveObjectStates: []mockRetrieveObjectState{ + { + objectType: "dcim.site", + objectID: 0, + queryParams: map[string]string{"q": "test-site-1"}, + objectChangeID: 0, + object: &netbox.DcimSiteDataWrapper{ + Site: &netbox.DcimSite{ + ID: 1235, + Name: "test-site-1", + Slug: "test-site-1", + }, + }, + }, + { + objectType: "extras.tag", + objectID: 0, + queryParams: map[string]string{"q": "tag 1"}, + objectChangeID: 0, + object: &netbox.TagDataWrapper{ + Tag: &netbox.Tag{ + ID: 1, + Name: "tag 1", + Slug: "tag-1", + }, + }, + }, + }, + createChangeSets: []mockCreateChangeSet{ + {ingestionLogDBID: 1234, id: 1235}, + }, + expectChanges: []changeExpectation{ + { + objectType: "dcim.site", + changeType: "update", + data: map[string]interface{}{ + "id": float64(1235), + "name": "test-site-1", + "slug": "test-site-1", + "tags": []interface{}{ + map[string]interface{}{ + "id": float64(1), + "name": "tag 1", + "slug": "tag-1", + }, + }, + }, + }, + }, + }, + { + name: "error if missing ingestion log", + ingestionLogID: "c4923cff-0d06-4a6a-93b8-23e6636bb42a", + retrieveIngestionLogByExternalIDs: []mockRetrieveIngestionLogByExternalID{ + { + uuid: "c4923cff-0d06-4a6a-93b8-23e6636bb42a", + err: status.Errorf(codes.NotFound, "not found"), + }, + }, + hasError: true, + errorCode: codes.NotFound, + errorMessage: "not found", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + s := miniredis.RunT(t) + defer s.Close() + + server, conn := startTestServer(ctx, t, s.Addr()) + for _, m := range tt.retrieveIngestionLogByExternalIDs { + server.mockRepository.EXPECT().RetrieveIngestionLogByExternalID(ctx, m.uuid).Return(&m.dbid, m.log, m.err) + } + for _, m := range tt.retrieveObjectStates { + server.mockNetBoxClient.EXPECT().RetrieveObjectState(ctx, netboxdiodeplugin.RetrieveObjectStateQueryParams{ + ObjectType: m.objectType, + ObjectID: m.objectID, + Params: m.queryParams, + }).Return(&netboxdiodeplugin.ObjectState{ + ObjectID: m.objectID, + ObjectType: m.objectType, + ObjectChangeID: m.objectChangeID, + Object: m.object, + }, nil) + } + for _, m := range tt.createChangeSets { + server.mockRepository.EXPECT().CreateChangeSet(ctx, mock.AnythingOfType("changeset.ChangeSet"), m.ingestionLogDBID).Return(&m.id, nil) + } + for _, m := range tt.updateIngestionLogStateWithError { + server.mockRepository.EXPECT().UpdateIngestionLogStateWithError(ctx, m.ingestionLogDBID, m.state, mock.Anything).Return(m.err) + } + + req := &pb.ActionIngestionLogRequest{ + Action: pb.ActionType_ACTION_DIFF, + IngestionLogId: tt.ingestionLogID, + BranchId: tt.branchID, + } + + resp, err := server.ActionIngestionLog(ctx, req) + if tt.hasError { + require.Error(t, err) + require.Nil(t, resp) + require.Equal(t, tt.errorCode, status.Code(err)) + require.Contains(t, err.Error(), tt.errorMessage) + } else { + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Log.ChangeSet) + + changeSet := decodeChangeSet(t, resp.Log.ChangeSet.Data) + if tt.expectChanges != nil { + require.Len(t, changeSet.ChangeSet, len(tt.expectChanges)) + for i, ex := range tt.expectChanges { + change := changeSet.ChangeSet[i] + require.Equal(t, ex.changeType, change.ChangeType) + require.Equal(t, ex.objectType, change.ObjectType) + require.Equal(t, ex.data, change.Data) + } + } + + if tt.branchID == nil { + require.Nil(t, changeSet.BranchID) + } else { + require.NotNil(t, changeSet.BranchID) + require.Equal(t, *tt.branchID, *changeSet.BranchID) + } + } + + err = server.Stop() + require.NoError(t, err) + err = conn.Close() + require.NoError(t, err) + }) + } +}