From 3f46f8a450b070b75792b81ec01c6401fe7a3294 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Sat, 21 Dec 2024 10:46:44 +0000 Subject: [PATCH] feat: postgres datastore implementation - part 2 (#205) Signed-off-by: Michal Fiedorowicz Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Luke Tucker <64618+ltucker@users.noreply.github.com> --- diode-server/cmd/reconciler/main.go | 15 +- .../migrations/00001_ingestion_logs.sql | 12 +- .../postgres/migrations/00002_change_sets.sql | 63 +- .../dbstore/postgres/queries/change_sets.sql | 4 +- .../postgres/queries/ingestion_logs.sql | 39 +- diode-server/dbstore/postgres/repositories.go | 43 -- diode-server/dbstore/postgres/repository.go | 255 +++++++ .../gen/dbstore/postgres/change_sets.sql.go | 26 +- .../dbstore/postgres/ingestion_logs.sql.go | 191 ++++- diode-server/gen/dbstore/postgres/types.go | 30 +- diode-server/go.mod | 1 - diode-server/go.sum | 10 +- diode-server/ingester/component_test.go | 4 +- .../netboxdiodeplugin/mocks/netboxapi.go | 2 +- .../reconciler/changeset/changeset.go | 28 + .../reconciler/ingestion_processor.go | 200 ++--- .../ingestion_processor_internal_test.go | 120 +-- .../reconciler/ingestion_processor_test.go | 17 +- diode-server/reconciler/logs_retriever.go | 269 ++----- .../reconciler/logs_retriever_test.go | 49 -- diode-server/reconciler/migration.go | 164 ---- diode-server/reconciler/migration_test.go | 127 ---- .../reconciler/mocks/changesetrepository.go | 85 --- diode-server/reconciler/mocks/client.go | 4 +- .../mocks/ingestionlogrepository.go | 86 --- diode-server/reconciler/mocks/redisclient.go | 6 +- diode-server/reconciler/mocks/repository.go | 328 ++++++++ diode-server/reconciler/repositories.go | 17 - diode-server/reconciler/repository.go | 17 + diode-server/reconciler/server.go | 6 +- .../reconciler/server_internal_test.go | 700 ++++++++---------- diode-server/reconciler/server_test.go | 7 +- diode-server/sqlc.yaml | 7 + 33 files changed, 1432 insertions(+), 1500 deletions(-) delete mode 100644 diode-server/dbstore/postgres/repositories.go create mode 100644 diode-server/dbstore/postgres/repository.go delete mode 100644 diode-server/reconciler/logs_retriever_test.go delete mode 100644 diode-server/reconciler/migration.go delete mode 100644 diode-server/reconciler/migration_test.go delete mode 100644 diode-server/reconciler/mocks/changesetrepository.go delete mode 100644 diode-server/reconciler/mocks/ingestionlogrepository.go create mode 100644 diode-server/reconciler/mocks/repository.go delete mode 100644 diode-server/reconciler/repositories.go create mode 100644 diode-server/reconciler/repository.go diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index 98f3af4f..dd5a2cd2 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -29,9 +29,11 @@ func main() { dbURL := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.PostgresHost, cfg.PostgresPort, cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDBName) - if err := runDBMigrations(ctx, s.Logger(), dbURL); err != nil { - s.Logger().Error("failed to run db migrations", "error", err) - os.Exit(1) + if cfg.MigrationEnabled { + if err := runDBMigrations(ctx, s.Logger(), dbURL); err != nil { + s.Logger().Error("failed to run db migrations", "error", err) + os.Exit(1) + } } dbPool, err := pgxpool.New(ctx, dbURL) @@ -41,10 +43,9 @@ func main() { } defer dbPool.Close() - ingestionLogRepo := postgres.NewIngestionLogRepository(dbPool) - changeSetRepo := postgres.NewChangeSetRepository(dbPool) + repository := postgres.NewRepository(dbPool) - ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ingestionLogRepo, changeSetRepo) + ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), repository) if err != nil { s.Logger().Error("failed to instantiate ingestion processor", "error", err) os.Exit(1) @@ -55,7 +56,7 @@ func main() { os.Exit(1) } - gRPCServer, err := reconciler.NewServer(ctx, s.Logger()) + gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository) if err != nil { s.Logger().Error("failed to instantiate gRPC server", "error", err) os.Exit(1) diff --git a/diode-server/dbstore/postgres/migrations/00001_ingestion_logs.sql b/diode-server/dbstore/postgres/migrations/00001_ingestion_logs.sql index b56982fa..2914c41b 100644 --- a/diode-server/dbstore/postgres/migrations/00001_ingestion_logs.sql +++ b/diode-server/dbstore/postgres/migrations/00001_ingestion_logs.sql @@ -3,8 +3,8 @@ -- Create the ingestion_logs table CREATE TABLE IF NOT EXISTS ingestion_logs ( - id SERIAL PRIMARY KEY, - ingestion_log_ksuid CHAR(27) NOT NULL, + id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + external_id VARCHAR(255) NOT NULL, data_type VARCHAR(255), state INTEGER, request_id VARCHAR(255), @@ -21,10 +21,10 @@ CREATE TABLE IF NOT EXISTS ingestion_logs ); -- Create indices -CREATE INDEX IF NOT EXISTS idx_ingestion_logs_ingestion_log_ksuid ON ingestion_logs(ingestion_log_ksuid); -CREATE INDEX IF NOT EXISTS idx_ingestion_logs_data_type ON ingestion_logs(data_type); -CREATE INDEX IF NOT EXISTS idx_ingestion_logs_state ON ingestion_logs(state); -CREATE INDEX IF NOT EXISTS idx_ingestion_logs_request_id ON ingestion_logs(request_id); +CREATE INDEX IF NOT EXISTS idx_ingestion_logs_external_id ON ingestion_logs (external_id); +CREATE INDEX IF NOT EXISTS idx_ingestion_logs_data_type ON ingestion_logs (data_type); +CREATE INDEX IF NOT EXISTS idx_ingestion_logs_state ON ingestion_logs (state); +CREATE INDEX IF NOT EXISTS idx_ingestion_logs_request_id ON ingestion_logs (request_id); -- +goose Down diff --git a/diode-server/dbstore/postgres/migrations/00002_change_sets.sql b/diode-server/dbstore/postgres/migrations/00002_change_sets.sql index 3944b49f..1fe7766b 100644 --- a/diode-server/dbstore/postgres/migrations/00002_change_sets.sql +++ b/diode-server/dbstore/postgres/migrations/00002_change_sets.sql @@ -3,45 +3,62 @@ -- Create the change_sets table CREATE TABLE IF NOT EXISTS change_sets ( - id SERIAL PRIMARY KEY, - change_set_ksuid CHAR(27) NOT NULL, - ingestion_log_id INTEGER NOT NULL, - branch_name VARCHAR(255), - created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + external_id VARCHAR(255) NOT NULL, + ingestion_log_id INTEGER NOT NULL, + branch_id VARCHAR(255), + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); -- Create indices -CREATE INDEX IF NOT EXISTS idx_change_sets_change_set_ksuid ON change_sets(change_set_ksuid); +CREATE INDEX IF NOT EXISTS idx_change_sets_external_id ON change_sets (external_id); +CREATE INDEX IF NOT EXISTS idx_change_sets_ingestion_log_id ON change_sets (ingestion_log_id); -- Create the changes table CREATE TABLE IF NOT EXISTS changes ( - id SERIAL PRIMARY KEY, - change_ksuid CHAR(27) NOT NULL, - change_set_id INTEGER NOT NULL, - change_type VARCHAR(50) NOT NULL, - object_type VARCHAR(100) NOT NULL, - object_id INTEGER, - object_version INTEGER, - data JSONB, + id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + external_id VARCHAR(255) NOT NULL, + change_set_id INTEGER NOT NULL, + change_type VARCHAR(50) NOT NULL, + object_type VARCHAR(100) NOT NULL, + object_id INTEGER, + object_version INTEGER, + data JSONB, sequence_number INTEGER, - created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); -- Create indices -CREATE INDEX IF NOT EXISTS idx_changes_change_ksuid ON changes(change_ksuid); -CREATE INDEX IF NOT EXISTS idx_changes_change_set_id ON changes(change_set_id); -CREATE INDEX IF NOT EXISTS idx_changes_change_type ON changes(change_type); -CREATE INDEX IF NOT EXISTS idx_changes_object_type ON changes(object_type); +CREATE INDEX IF NOT EXISTS idx_changes_external_id ON changes (external_id); +CREATE INDEX IF NOT EXISTS idx_changes_change_set_id ON changes (change_set_id); +CREATE INDEX IF NOT EXISTS idx_changes_change_type ON changes (change_type); +CREATE INDEX IF NOT EXISTS idx_changes_object_type ON changes (object_type); -- Add foreign key constraints -ALTER TABLE change_sets ADD CONSTRAINT fk_change_sets_ingestion_logs FOREIGN KEY (ingestion_log_id) REFERENCES ingestion_logs(id); -ALTER TABLE changes ADD CONSTRAINT fk_changes_change_sets FOREIGN KEY (change_set_id) REFERENCES change_sets(id); +ALTER TABLE change_sets + ADD CONSTRAINT fk_change_sets_ingestion_logs FOREIGN KEY (ingestion_log_id) REFERENCES ingestion_logs (id); +ALTER TABLE changes + ADD CONSTRAINT fk_changes_change_sets FOREIGN KEY (change_set_id) REFERENCES change_sets (id); + +-- Create a view to join ingestion_logs with aggregated change_set and changes +CREATE VIEW v_ingestion_logs_with_change_set AS +SELECT ingestion_logs.*, + row_to_json(change_sets.*) AS change_set, + JSON_AGG(changes.* ORDER BY changes.sequence_number ASC) FILTER ( WHERE changes.id IS NOT NULL ) AS changes +FROM ingestion_logs + LEFT JOIN change_sets on ingestion_logs.id = change_sets.ingestion_log_id + LEFT JOIN changes on change_sets.id = changes.change_set_id +GROUP BY ingestion_logs.id, change_sets.id +ORDER BY ingestion_logs.id DESC, change_sets.id DESC; -- +goose Down +-- Drop the v_ingestion_logs_with_change_sets view +DROP VIEW IF EXISTS v_ingestion_logs_with_change_sets; + -- Drop the changes table DROP TABLE changes; diff --git a/diode-server/dbstore/postgres/queries/change_sets.sql b/diode-server/dbstore/postgres/queries/change_sets.sql index 280f24c6..e1101b24 100644 --- a/diode-server/dbstore/postgres/queries/change_sets.sql +++ b/diode-server/dbstore/postgres/queries/change_sets.sql @@ -1,12 +1,12 @@ -- name: CreateChangeSet :one -INSERT INTO change_sets (change_set_ksuid, ingestion_log_id, branch_name) +INSERT INTO change_sets (external_id, ingestion_log_id, branch_id) VALUES ($1, $2, $3) RETURNING *; -- name: CreateChange :one -INSERT INTO changes (change_ksuid, change_set_id, change_type, object_type, object_id, object_version, data, +INSERT INTO changes (external_id, change_set_id, change_type, object_type, object_id, object_version, data, sequence_number) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *; diff --git a/diode-server/dbstore/postgres/queries/ingestion_logs.sql b/diode-server/dbstore/postgres/queries/ingestion_logs.sql index 50fc3e21..9f8334ac 100644 --- a/diode-server/dbstore/postgres/queries/ingestion_logs.sql +++ b/diode-server/dbstore/postgres/queries/ingestion_logs.sql @@ -1,4 +1,39 @@ -- name: CreateIngestionLog :one -INSERT INTO ingestion_logs (ingestion_log_ksuid, data_type, state, request_id, ingestion_ts, producer_app_name, +INSERT INTO ingestion_logs (external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, source_metadata) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *; +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) +RETURNING *; + +-- name: UpdateIngestionLogStateWithError :exec +UPDATE ingestion_logs +SET state = $2, + error = $3 +WHERE id = $1 +RETURNING *; + +-- name: CountIngestionLogsPerState :many +SELECT state, COUNT(*) AS count +FROM ingestion_logs +GROUP BY state; + +-- name: RetrieveIngestionLogs :many +SELECT * +FROM ingestion_logs +WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL) + AND (data_type = sqlc.narg('data_type') OR sqlc.narg('data_type') IS NULL) + AND (ingestion_ts >= sqlc.narg('ingestion_ts_start') OR sqlc.narg('ingestion_ts_start') IS NULL) + AND (ingestion_ts <= sqlc.narg('ingestion_ts_end') OR sqlc.narg('ingestion_ts_end') IS NULL) +ORDER BY id DESC +LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); + +-- name: RetrieveIngestionLogsWithChangeSets :many +SELECT v_ingestion_logs_with_change_set.* +FROM v_ingestion_logs_with_change_set +WHERE (v_ingestion_logs_with_change_set.state = sqlc.narg('state') OR sqlc.narg('state') IS NULL) + AND (v_ingestion_logs_with_change_set.data_type = sqlc.narg('data_type') OR sqlc.narg('data_type') IS NULL) + AND (v_ingestion_logs_with_change_set.ingestion_ts >= sqlc.narg('ingestion_ts_start') OR + sqlc.narg('ingestion_ts_start') IS NULL) + AND (v_ingestion_logs_with_change_set.ingestion_ts <= sqlc.narg('ingestion_ts_end') OR + sqlc.narg('ingestion_ts_end') IS NULL) +ORDER BY v_ingestion_logs_with_change_set.id DESC +LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); diff --git a/diode-server/dbstore/postgres/repositories.go b/diode-server/dbstore/postgres/repositories.go deleted file mode 100644 index 89024cf1..00000000 --- a/diode-server/dbstore/postgres/repositories.go +++ /dev/null @@ -1,43 +0,0 @@ -package postgres - -import ( - "context" - "errors" - - "github.com/netboxlabs/diode/diode-server/gen/dbstore/postgres" - "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" -) - -// IngestionLogRepository allows interacting with ingestion logs. -type IngestionLogRepository struct { - queries *postgres.Queries -} - -// NewIngestionLogRepository creates a new IngestionLogRepository. -func NewIngestionLogRepository(db postgres.DBTX) *IngestionLogRepository { - return &IngestionLogRepository{ - queries: postgres.New(db), - } -} - -// CreateIngestionLog creates a new ingestion log. -func (r *IngestionLogRepository) CreateIngestionLog(_ context.Context, _ *reconcilerpb.IngestionLog, _ []byte) error { - return errors.New("not implemented") -} - -// ChangeSetRepository allows interacting with change sets. -type ChangeSetRepository struct { - queries *postgres.Queries -} - -// NewChangeSetRepository creates a new ChangeSetRepository. -func NewChangeSetRepository(db postgres.DBTX) *ChangeSetRepository { - return &ChangeSetRepository{ - queries: postgres.New(db), - } -} - -// CreateChangeSet creates a new change set. -func (r *ChangeSetRepository) CreateChangeSet(_ context.Context, _ *reconcilerpb.ChangeSet) error { - return errors.New("not implemented") -} diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go new file mode 100644 index 00000000..adb4ec07 --- /dev/null +++ b/diode-server/dbstore/postgres/repository.go @@ -0,0 +1,255 @@ +package postgres + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/netboxlabs/diode/diode-server/gen/dbstore/postgres" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" +) + +// Repository is an interface for interacting with ingestion logs and change sets. +type Repository struct { + pool *pgxpool.Pool + queries *postgres.Queries +} + +// NewRepository creates a new Repository. +func NewRepository(pool *pgxpool.Pool) *Repository { + return &Repository{ + pool: pool, + queries: postgres.New(pool), + } +} + +// CreateIngestionLog creates a new ingestion log. +func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) { + entityJSON, err := protojson.Marshal(ingestionLog.Entity) + if err != nil { + return nil, fmt.Errorf("failed to marshal entity: %w", err) + } + params := postgres.CreateIngestionLogParams{ + ExternalID: ingestionLog.Id, + DataType: pgtype.Text{String: ingestionLog.DataType, Valid: true}, + State: pgtype.Int4{Int32: int32(ingestionLog.State), Valid: true}, + RequestID: pgtype.Text{String: ingestionLog.RequestId, Valid: true}, + IngestionTs: pgtype.Int8{Int64: ingestionLog.IngestionTs, Valid: true}, + ProducerAppName: pgtype.Text{String: ingestionLog.ProducerAppName, Valid: true}, + ProducerAppVersion: pgtype.Text{String: ingestionLog.ProducerAppVersion, Valid: true}, + SdkName: pgtype.Text{String: ingestionLog.SdkName, Valid: true}, + SdkVersion: pgtype.Text{String: ingestionLog.SdkVersion, Valid: true}, + Entity: entityJSON, + SourceMetadata: sourceMetadata, + } + + createdIngestionLog, err := r.queries.CreateIngestionLog(ctx, params) + if err != nil { + return nil, err + } + return &createdIngestionLog.ID, nil +} + +// UpdateIngestionLogStateWithError updates an ingestion log with a new state and error. +func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error { + params := postgres.UpdateIngestionLogStateWithErrorParams{ + ID: id, + State: pgtype.Int4{Int32: int32(state), Valid: true}, + } + + if ingestionError != nil { + ingestionErrJSON, err := json.Marshal(ingestionError) + if err != nil { + return fmt.Errorf("failed to marshal error: %w", err) + } + params.Error = ingestionErrJSON + } + return r.queries.UpdateIngestionLogStateWithError(ctx, params) +} + +// CountIngestionLogsPerState counts ingestion logs per state. +func (r *Repository) CountIngestionLogsPerState(ctx context.Context) (map[reconcilerpb.State]int32, error) { + counts, err := r.queries.CountIngestionLogsPerState(ctx) + if err != nil { + return nil, err + } + + stateCounts := make(map[reconcilerpb.State]int32) + for _, stateCount := range counts { + stateCounts[reconcilerpb.State(stateCount.State.Int32)] = int32(stateCount.Count) + } + return stateCounts, nil +} + +// RetrieveIngestionLogs retrieves ingestion logs. +func (r *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) { + params := postgres.RetrieveIngestionLogsWithChangeSetsParams{ + Limit: limit, + Offset: offset, + } + if filter.State != nil { + params.State = pgtype.Int4{Int32: int32(*filter.State), Valid: true} + } + if filter.DataType != "" { + params.DataType = pgtype.Text{String: filter.DataType, Valid: true} + } + if filter.IngestionTsStart > 0 { + params.IngestionTsStart = pgtype.Int8{Int64: filter.IngestionTsStart, Valid: true} + } + if filter.IngestionTsEnd > 0 { + params.IngestionTsEnd = pgtype.Int8{Int64: filter.IngestionTsEnd, Valid: true} + } + + rawIngestionLogs, err := r.queries.RetrieveIngestionLogsWithChangeSets(ctx, params) + if err != nil { + return nil, err + } + + ingestionLogs := make([]*reconcilerpb.IngestionLog, 0, len(rawIngestionLogs)) + for _, row := range rawIngestionLogs { + ingestionLog := row + entity := &diodepb.Entity{} + if err := protojson.Unmarshal(ingestionLog.Entity, entity); err != nil { + return nil, fmt.Errorf("failed to unmarshal entity: %w", err) + } + var ingestionErr reconcilerpb.IngestionError + if ingestionLog.Error != nil { + if err := protojson.Unmarshal(ingestionLog.Error, &ingestionErr); err != nil { + return nil, fmt.Errorf("failed to unmarshal error: %w", err) + } + } + + log := &reconcilerpb.IngestionLog{ + Id: ingestionLog.ExternalID, + DataType: ingestionLog.DataType.String, + State: reconcilerpb.State(ingestionLog.State.Int32), + RequestId: ingestionLog.RequestID.String, + IngestionTs: ingestionLog.IngestionTs.Int64, + ProducerAppName: ingestionLog.ProducerAppName.String, + ProducerAppVersion: ingestionLog.ProducerAppVersion.String, + SdkName: ingestionLog.SdkName.String, + SdkVersion: ingestionLog.SdkVersion.String, + Entity: entity, + Error: &ingestionErr, + } + + if row.Changes != nil { + var dbChanges []postgres.Change + if err := json.Unmarshal(row.Changes, &dbChanges); err != nil { + return nil, fmt.Errorf("failed to unmarshal changes: %w", err) + } + + changes := make([]changeset.Change, 0, len(dbChanges)) + for _, dbChange := range dbChanges { + change := changeset.Change{ + ChangeID: dbChange.ExternalID, + ChangeType: dbChange.ChangeType, + ObjectType: dbChange.ObjectType, + Data: dbChange.Data, + } + + objID := int(dbChange.ObjectID.Int32) + if dbChange.ObjectID.Valid { + change.ObjectID = &objID + } + objVersion := int(dbChange.ObjectVersion.Int32) + if dbChange.ObjectVersion.Valid { + change.ObjectVersion = &objVersion + } + + changes = append(changes, change) + } + + changeSet := &changeset.ChangeSet{ + ChangeSetID: row.ChangeSet.ExternalID, + ChangeSet: changes, + } + + var compressedChangeSet []byte + if len(changes) > 0 { + b, err := changeset.CompressChangeSet(changeSet) + if err != nil { + return nil, fmt.Errorf("failed to compress change set: %w", err) + } + compressedChangeSet = b + } + + log.ChangeSet = &reconcilerpb.ChangeSet{ + Id: row.ChangeSet.ExternalID, + Data: compressedChangeSet, + } + } + + ingestionLogs = append(ingestionLogs, log) + } + + return ingestionLogs, nil +} + +// CreateChangeSet creates a new change set. +func (r *Repository) CreateChangeSet(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32) (*int32, error) { + tx, err := r.pool.Begin(ctx) + if err != nil { + return nil, fmt.Errorf("failed to start transaction: %w", err) + } + + rollback := func() { + if err := tx.Rollback(ctx); err != nil { + panic(fmt.Errorf("failed to rollback transaction: %w", err)) + } + } + + qtx := r.queries.WithTx(tx) + params := postgres.CreateChangeSetParams{ + ExternalID: changeSet.ChangeSetID, + IngestionLogID: ingestionLogID, + } + if changeSet.BranchID != nil { + params.BranchID = pgtype.Text{String: *changeSet.BranchID, Valid: true} + } + cs, err := qtx.CreateChangeSet(ctx, params) + if err != nil { + rollback() + return nil, fmt.Errorf("failed to create change set: %w", err) + } + + for i, change := range changeSet.ChangeSet { + dataJSON, err := json.Marshal(change.Data) + if err != nil { + rollback() + return nil, fmt.Errorf("failed to marshal entity: %w", err) + } + + changeParams := postgres.CreateChangeParams{ + ExternalID: change.ChangeID, + ChangeSetID: cs.ID, + ChangeType: change.ChangeType, + ObjectType: change.ObjectType, + Data: dataJSON, + SequenceNumber: pgtype.Int4{Int32: int32(i), Valid: true}, + } + if change.ObjectID != nil { + changeParams.ObjectID = pgtype.Int4{Int32: int32(*change.ObjectID), Valid: true} + } + if change.ObjectVersion != nil { + changeParams.ObjectVersion = pgtype.Int4{Int32: int32(*change.ObjectVersion), Valid: true} + } + + if _, err = qtx.CreateChange(ctx, changeParams); err != nil { + rollback() + return nil, fmt.Errorf("failed to create change: %w", err) + } + } + + if err := tx.Commit(ctx); err != nil { + rollback() + return nil, fmt.Errorf("failed to commit transaction: %w", err) + } + return &cs.ID, nil +} diff --git a/diode-server/gen/dbstore/postgres/change_sets.sql.go b/diode-server/gen/dbstore/postgres/change_sets.sql.go index 4b00f0c3..683e3fca 100644 --- a/diode-server/gen/dbstore/postgres/change_sets.sql.go +++ b/diode-server/gen/dbstore/postgres/change_sets.sql.go @@ -13,26 +13,26 @@ import ( const createChange = `-- name: CreateChange :one -INSERT INTO changes (change_ksuid, change_set_id, change_type, object_type, object_id, object_version, data, +INSERT INTO changes (external_id, change_set_id, change_type, object_type, object_id, object_version, data, sequence_number) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) -RETURNING id, change_ksuid, change_set_id, change_type, object_type, object_id, object_version, data, sequence_number, created_at, updated_at +RETURNING id, external_id, change_set_id, change_type, object_type, object_id, object_version, data, sequence_number, created_at, updated_at ` type CreateChangeParams struct { - ChangeKsuid string `json:"change_ksuid"` + ExternalID string `json:"external_id"` ChangeSetID int32 `json:"change_set_id"` ChangeType string `json:"change_type"` ObjectType string `json:"object_type"` ObjectID pgtype.Int4 `json:"object_id"` ObjectVersion pgtype.Int4 `json:"object_version"` - Data []byte `json:"data"` + Data any `json:"data"` SequenceNumber pgtype.Int4 `json:"sequence_number"` } func (q *Queries) CreateChange(ctx context.Context, arg CreateChangeParams) (Change, error) { row := q.db.QueryRow(ctx, createChange, - arg.ChangeKsuid, + arg.ExternalID, arg.ChangeSetID, arg.ChangeType, arg.ObjectType, @@ -44,7 +44,7 @@ func (q *Queries) CreateChange(ctx context.Context, arg CreateChangeParams) (Cha var i Change err := row.Scan( &i.ID, - &i.ChangeKsuid, + &i.ExternalID, &i.ChangeSetID, &i.ChangeType, &i.ObjectType, @@ -60,25 +60,25 @@ func (q *Queries) CreateChange(ctx context.Context, arg CreateChangeParams) (Cha const createChangeSet = `-- name: CreateChangeSet :one -INSERT INTO change_sets (change_set_ksuid, ingestion_log_id, branch_name) +INSERT INTO change_sets (external_id, ingestion_log_id, branch_id) VALUES ($1, $2, $3) -RETURNING id, change_set_ksuid, ingestion_log_id, branch_name, created_at, updated_at +RETURNING id, external_id, ingestion_log_id, branch_id, created_at, updated_at ` type CreateChangeSetParams struct { - ChangeSetKsuid string `json:"change_set_ksuid"` + ExternalID string `json:"external_id"` IngestionLogID int32 `json:"ingestion_log_id"` - BranchName pgtype.Text `json:"branch_name"` + BranchID pgtype.Text `json:"branch_id"` } func (q *Queries) CreateChangeSet(ctx context.Context, arg CreateChangeSetParams) (ChangeSet, error) { - row := q.db.QueryRow(ctx, createChangeSet, arg.ChangeSetKsuid, arg.IngestionLogID, arg.BranchName) + row := q.db.QueryRow(ctx, createChangeSet, arg.ExternalID, arg.IngestionLogID, arg.BranchID) var i ChangeSet err := row.Scan( &i.ID, - &i.ChangeSetKsuid, + &i.ExternalID, &i.IngestionLogID, - &i.BranchName, + &i.BranchID, &i.CreatedAt, &i.UpdatedAt, ) diff --git a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go index 4ce844a0..da019b51 100644 --- a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go +++ b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go @@ -11,14 +11,46 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const countIngestionLogsPerState = `-- name: CountIngestionLogsPerState :many +SELECT state, COUNT(*) AS count +FROM ingestion_logs +GROUP BY state +` + +type CountIngestionLogsPerStateRow struct { + State pgtype.Int4 `json:"state"` + Count int64 `json:"count"` +} + +func (q *Queries) CountIngestionLogsPerState(ctx context.Context) ([]CountIngestionLogsPerStateRow, error) { + rows, err := q.db.Query(ctx, countIngestionLogsPerState) + if err != nil { + return nil, err + } + defer rows.Close() + var items []CountIngestionLogsPerStateRow + for rows.Next() { + var i CountIngestionLogsPerStateRow + if err := rows.Scan(&i.State, &i.Count); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const createIngestionLog = `-- name: CreateIngestionLog :one -INSERT INTO ingestion_logs (ingestion_log_ksuid, data_type, state, request_id, ingestion_ts, producer_app_name, +INSERT INTO ingestion_logs (external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, source_metadata) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id, ingestion_log_ksuid, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) +RETURNING id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at ` type CreateIngestionLogParams struct { - IngestionLogKsuid string `json:"ingestion_log_ksuid"` + ExternalID string `json:"external_id"` DataType pgtype.Text `json:"data_type"` State pgtype.Int4 `json:"state"` RequestID pgtype.Text `json:"request_id"` @@ -33,7 +65,7 @@ type CreateIngestionLogParams struct { func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLogParams) (IngestionLog, error) { row := q.db.QueryRow(ctx, createIngestionLog, - arg.IngestionLogKsuid, + arg.ExternalID, arg.DataType, arg.State, arg.RequestID, @@ -48,7 +80,7 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog var i IngestionLog err := row.Scan( &i.ID, - &i.IngestionLogKsuid, + &i.ExternalID, &i.DataType, &i.State, &i.RequestID, @@ -65,3 +97,152 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog ) return i, err } + +const retrieveIngestionLogs = `-- name: RetrieveIngestionLogs :many +SELECT id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +FROM ingestion_logs +WHERE (state = $1 OR $1 IS NULL) + AND (data_type = $2 OR $2 IS NULL) + AND (ingestion_ts >= $3 OR $3 IS NULL) + AND (ingestion_ts <= $4 OR $4 IS NULL) +ORDER BY id DESC +LIMIT $6 OFFSET $5 +` + +type RetrieveIngestionLogsParams struct { + State pgtype.Int4 `json:"state"` + DataType pgtype.Text `json:"data_type"` + IngestionTsStart pgtype.Int8 `json:"ingestion_ts_start"` + IngestionTsEnd pgtype.Int8 `json:"ingestion_ts_end"` + Offset int32 `json:"offset"` + Limit int32 `json:"limit"` +} + +func (q *Queries) RetrieveIngestionLogs(ctx context.Context, arg RetrieveIngestionLogsParams) ([]IngestionLog, error) { + rows, err := q.db.Query(ctx, retrieveIngestionLogs, + arg.State, + arg.DataType, + arg.IngestionTsStart, + arg.IngestionTsEnd, + arg.Offset, + arg.Limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []IngestionLog + for rows.Next() { + var i IngestionLog + if err := rows.Scan( + &i.ID, + &i.ExternalID, + &i.DataType, + &i.State, + &i.RequestID, + &i.IngestionTs, + &i.ProducerAppName, + &i.ProducerAppVersion, + &i.SdkName, + &i.SdkVersion, + &i.Entity, + &i.Error, + &i.SourceMetadata, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const retrieveIngestionLogsWithChangeSets = `-- name: RetrieveIngestionLogsWithChangeSets :many +SELECT v_ingestion_logs_with_change_set.id, v_ingestion_logs_with_change_set.external_id, v_ingestion_logs_with_change_set.data_type, v_ingestion_logs_with_change_set.state, v_ingestion_logs_with_change_set.request_id, v_ingestion_logs_with_change_set.ingestion_ts, v_ingestion_logs_with_change_set.producer_app_name, v_ingestion_logs_with_change_set.producer_app_version, v_ingestion_logs_with_change_set.sdk_name, v_ingestion_logs_with_change_set.sdk_version, v_ingestion_logs_with_change_set.entity, v_ingestion_logs_with_change_set.error, v_ingestion_logs_with_change_set.source_metadata, v_ingestion_logs_with_change_set.created_at, v_ingestion_logs_with_change_set.updated_at, v_ingestion_logs_with_change_set.change_set, v_ingestion_logs_with_change_set.changes +FROM v_ingestion_logs_with_change_set +WHERE (v_ingestion_logs_with_change_set.state = $1 OR $1 IS NULL) + AND (v_ingestion_logs_with_change_set.data_type = $2 OR $2 IS NULL) + AND (v_ingestion_logs_with_change_set.ingestion_ts >= $3 OR + $3 IS NULL) + AND (v_ingestion_logs_with_change_set.ingestion_ts <= $4 OR + $4 IS NULL) +ORDER BY v_ingestion_logs_with_change_set.id DESC +LIMIT $6 OFFSET $5 +` + +type RetrieveIngestionLogsWithChangeSetsParams struct { + State pgtype.Int4 `json:"state"` + DataType pgtype.Text `json:"data_type"` + IngestionTsStart pgtype.Int8 `json:"ingestion_ts_start"` + IngestionTsEnd pgtype.Int8 `json:"ingestion_ts_end"` + Offset int32 `json:"offset"` + Limit int32 `json:"limit"` +} + +func (q *Queries) RetrieveIngestionLogsWithChangeSets(ctx context.Context, arg RetrieveIngestionLogsWithChangeSetsParams) ([]VIngestionLogsWithChangeSet, error) { + rows, err := q.db.Query(ctx, retrieveIngestionLogsWithChangeSets, + arg.State, + arg.DataType, + arg.IngestionTsStart, + arg.IngestionTsEnd, + arg.Offset, + arg.Limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []VIngestionLogsWithChangeSet + for rows.Next() { + var i VIngestionLogsWithChangeSet + if err := rows.Scan( + &i.ID, + &i.ExternalID, + &i.DataType, + &i.State, + &i.RequestID, + &i.IngestionTs, + &i.ProducerAppName, + &i.ProducerAppVersion, + &i.SdkName, + &i.SdkVersion, + &i.Entity, + &i.Error, + &i.SourceMetadata, + &i.CreatedAt, + &i.UpdatedAt, + &i.ChangeSet, + &i.Changes, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateIngestionLogStateWithError = `-- name: UpdateIngestionLogStateWithError :exec +UPDATE ingestion_logs +SET state = $2, + error = $3 +WHERE id = $1 +RETURNING id, external_id, data_type, state, request_id, ingestion_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +` + +type UpdateIngestionLogStateWithErrorParams struct { + ID int32 `json:"id"` + State pgtype.Int4 `json:"state"` + Error []byte `json:"error"` +} + +func (q *Queries) UpdateIngestionLogStateWithError(ctx context.Context, arg UpdateIngestionLogStateWithErrorParams) error { + _, err := q.db.Exec(ctx, updateIngestionLogStateWithError, arg.ID, arg.State, arg.Error) + return err +} diff --git a/diode-server/gen/dbstore/postgres/types.go b/diode-server/gen/dbstore/postgres/types.go index 9a9489a3..cf331d3b 100644 --- a/diode-server/gen/dbstore/postgres/types.go +++ b/diode-server/gen/dbstore/postgres/types.go @@ -10,13 +10,13 @@ import ( type Change struct { ID int32 `json:"id"` - ChangeKsuid string `json:"change_ksuid"` + ExternalID string `json:"external_id"` ChangeSetID int32 `json:"change_set_id"` ChangeType string `json:"change_type"` ObjectType string `json:"object_type"` ObjectID pgtype.Int4 `json:"object_id"` ObjectVersion pgtype.Int4 `json:"object_version"` - Data []byte `json:"data"` + Data any `json:"data"` SequenceNumber pgtype.Int4 `json:"sequence_number"` CreatedAt pgtype.Timestamptz `json:"created_at"` UpdatedAt pgtype.Timestamptz `json:"updated_at"` @@ -24,16 +24,16 @@ type Change struct { type ChangeSet struct { ID int32 `json:"id"` - ChangeSetKsuid string `json:"change_set_ksuid"` + ExternalID string `json:"external_id"` IngestionLogID int32 `json:"ingestion_log_id"` - BranchName pgtype.Text `json:"branch_name"` + BranchID pgtype.Text `json:"branch_id"` CreatedAt pgtype.Timestamptz `json:"created_at"` UpdatedAt pgtype.Timestamptz `json:"updated_at"` } type IngestionLog struct { ID int32 `json:"id"` - IngestionLogKsuid string `json:"ingestion_log_ksuid"` + ExternalID string `json:"external_id"` DataType pgtype.Text `json:"data_type"` State pgtype.Int4 `json:"state"` RequestID pgtype.Text `json:"request_id"` @@ -48,3 +48,23 @@ type IngestionLog struct { CreatedAt pgtype.Timestamptz `json:"created_at"` UpdatedAt pgtype.Timestamptz `json:"updated_at"` } + +type VIngestionLogsWithChangeSet struct { + ID int32 `json:"id"` + ExternalID string `json:"external_id"` + DataType pgtype.Text `json:"data_type"` + State pgtype.Int4 `json:"state"` + RequestID pgtype.Text `json:"request_id"` + IngestionTs pgtype.Int8 `json:"ingestion_ts"` + ProducerAppName pgtype.Text `json:"producer_app_name"` + ProducerAppVersion pgtype.Text `json:"producer_app_version"` + SdkName pgtype.Text `json:"sdk_name"` + SdkVersion pgtype.Text `json:"sdk_version"` + Entity []byte `json:"entity"` + Error []byte `json:"error"` + SourceMetadata []byte `json:"source_metadata"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` + ChangeSet ChangeSet `json:"change_set"` + Changes []byte `json:"changes"` +} diff --git a/diode-server/go.mod b/diode-server/go.mod index ba08939f..65e68b2f 100644 --- a/diode-server/go.mod +++ b/diode-server/go.mod @@ -18,7 +18,6 @@ require ( github.com/oklog/run v1.1.0 github.com/pressly/goose/v3 v3.23.0 github.com/redis/go-redis/v9 v9.5.1 - github.com/segmentio/ksuid v1.0.4 github.com/stretchr/testify v1.9.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.64.1 diff --git a/diode-server/go.sum b/diode-server/go.sum index 272ba161..a1deba33 100644 --- a/diode-server/go.sum +++ b/diode-server/go.sum @@ -80,8 +80,6 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= -github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= -github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE= github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -99,10 +97,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= -golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= -golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= +golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= @@ -114,8 +110,6 @@ golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= -golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 h1:mxSlqyb8ZAHsYDCfiXN1EDdNTdvjUJSLY+OnAUtYNYA= diff --git a/diode-server/ingester/component_test.go b/diode-server/ingester/component_test.go index bbe24ab7..edc8b805 100644 --- a/diode-server/ingester/component_test.go +++ b/diode-server/ingester/component_test.go @@ -18,6 +18,7 @@ import ( pb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" "github.com/netboxlabs/diode/diode-server/ingester" "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) func getFreePort() (string, error) { @@ -70,7 +71,8 @@ const bufSize = 1024 * 1024 func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - server, err := reconciler.NewServer(ctx, logger) + mockRepository := mocks.NewRepository(t) + server, err := reconciler.NewServer(ctx, logger, mockRepository) require.NoError(t, err) errChan := make(chan error, 1) diff --git a/diode-server/netboxdiodeplugin/mocks/netboxapi.go b/diode-server/netboxdiodeplugin/mocks/netboxapi.go index fb7ab9c2..48b610a1 100644 --- a/diode-server/netboxdiodeplugin/mocks/netboxapi.go +++ b/diode-server/netboxdiodeplugin/mocks/netboxapi.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.49.1. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package mocks diff --git a/diode-server/reconciler/changeset/changeset.go b/diode-server/reconciler/changeset/changeset.go index 7a79170c..99975317 100644 --- a/diode-server/reconciler/changeset/changeset.go +++ b/diode-server/reconciler/changeset/changeset.go @@ -1,5 +1,13 @@ package changeset +import ( + "bytes" + "encoding/json" + "fmt" + + "github.com/andybalholm/brotli" +) + const ( // ChangeTypeCreate is the change type for a creation ChangeTypeCreate = "create" @@ -12,6 +20,7 @@ const ( type ChangeSet struct { ChangeSetID string `json:"change_set_id"` ChangeSet []Change `json:"change_set"` + BranchID *string `json:"branch_id,omitempty"` } // Change represents a change for the change set @@ -23,3 +32,22 @@ type Change struct { ObjectVersion *int `json:"object_version,omitempty"` Data any `json:"data"` } + +// CompressChangeSet compresses a change set +func CompressChangeSet(cs *ChangeSet) ([]byte, error) { + csJSON, err := json.Marshal(cs) + if err != nil { + return nil, fmt.Errorf("failed to marshal change set JSON: %v", err) + } + + var brotliBuf bytes.Buffer + brotliWriter := brotli.NewWriter(&brotliBuf) + if _, err = brotliWriter.Write(csJSON); err != nil { + return nil, fmt.Errorf("failed to compress change set: %v", err) + } + if err = brotliWriter.Close(); err != nil { + return nil, fmt.Errorf("failed to compress change set: %v", err) + } + + return brotliBuf.Bytes(), nil +} diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 6b499149..09a8914b 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -1,22 +1,17 @@ package reconciler import ( - "bytes" "context" - "encoding/json" "errors" "fmt" "log/slog" "os" - "regexp" "strconv" - "github.com/andybalholm/brotli" + "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "github.com/redis/go-redis/v9" - "github.com/segmentio/ksuid" "golang.org/x/time/rate" - "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" @@ -57,26 +52,26 @@ type RedisClient interface { // IngestionProcessor processes ingested data type IngestionProcessor struct { - Config Config - logger *slog.Logger - hostname string - redisClient RedisClient - redisStreamClient RedisClient - nbClient netboxdiodeplugin.NetBoxAPI - ingestionLogRepository IngestionLogRepository - changeSetRepository ChangeSetRepository + Config Config + logger *slog.Logger + hostname string + redisClient RedisClient + redisStreamClient RedisClient + nbClient netboxdiodeplugin.NetBoxAPI + repository Repository } // IngestionLogToProcess represents an ingestion log to process type IngestionLogToProcess struct { - key string - ingestionLog *reconcilerpb.IngestionLog - changeSet *changeset.ChangeSet - errors []error + ingestionLogID int32 + ingestionLog *reconcilerpb.IngestionLog + changeSetID int32 + changeSet *changeset.ChangeSet + errors []error } // NewIngestionProcessor creates a new ingestion processor -func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ingestionLogRepo IngestionLogRepository, changeSetRepo ChangeSetRepository) (*IngestionProcessor, error) { +func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, repository Repository) (*IngestionProcessor, error) { var cfg Config envconfig.MustProcess("", &cfg) @@ -111,14 +106,13 @@ func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ingestionLo } component := &IngestionProcessor{ - Config: cfg, - logger: logger, - hostname: hostname, - redisClient: redisClient, - redisStreamClient: redisStreamClient, - nbClient: nbClient, - ingestionLogRepository: ingestionLogRepo, - changeSetRepository: changeSetRepo, + Config: cfg, + logger: logger, + hostname: hostname, + redisClient: redisClient, + redisStreamClient: redisStreamClient, + nbClient: nbClient, + repository: repository, } return component, nil @@ -132,13 +126,6 @@ func (p *IngestionProcessor) Name() string { // Start starts the component func (p *IngestionProcessor) Start(ctx context.Context) error { p.logger.Info("starting component", "name", p.Name()) - - if p.Config.MigrationEnabled { - if err := migrate(ctx, p.logger, p.redisClient); err != nil { - return fmt.Errorf("failed to migrate: %v", err) - } - } - return p.consumeIngestionStream(ctx, redisStreamID, redisConsumerGroup, fmt.Sprintf("%s-%s", redisConsumerGroup, p.hostname)) } @@ -265,7 +252,7 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan case <-ctx.Done(): p.logger.Debug("context cancelled", "error", ctx.Err()) return - case ingestionLog, ok := <-generateChangeSetChan: + case msg, ok := <-generateChangeSetChan: if !ok { return } @@ -274,13 +261,11 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan return } - p.logger.Debug("generating change set", "ingestionLogID", ingestionLog.ingestionLog.GetId()) - ingestEntity := differ.IngestEntity{ - RequestID: ingestionLog.ingestionLog.GetId(), - DataType: ingestionLog.ingestionLog.GetDataType(), - Entity: ingestionLog.ingestionLog.GetEntity(), - State: int(ingestionLog.ingestionLog.GetState()), + RequestID: msg.ingestionLog.GetId(), + DataType: msg.ingestionLog.GetDataType(), + Entity: msg.ingestionLog.GetEntity(), + State: int(msg.ingestionLog.GetState()), } changeSet, err := differ.Diff(ctx, ingestEntity, "", p.nbClient) @@ -293,56 +278,40 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan "data_type": ingestEntity.DataType, } sentry.CaptureError(err, tags, "Ingest Entity", contextMap) - p.logger.Debug("failed to prepare change set", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "error", err) + p.logger.Debug("failed to prepare change set", "ingestionLogID", msg.ingestionLog.GetId(), "error", err) - ingestionLog.errors = append(ingestionLog.errors, fmt.Errorf("failed to prepare change set: %v", err)) + msg.errors = append(msg.errors, fmt.Errorf("failed to prepare change set: %v", err)) - ingestionLog.ingestionLog.State = reconcilerpb.State_FAILED - ingestionLog.ingestionLog.Error = extractIngestionError(err) + ingestionErr := extractIngestionError(err) - if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { - ingestionLog.errors = append(ingestionLog.errors, err) + if err = p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { + msg.errors = append(msg.errors, err) } break } - ingestionLog.changeSet = changeSet - - if len(changeSet.ChangeSet) > 0 { - csCompressed, err := compressChangeSet(changeSet) - if err != nil { - ingestionLog.ingestionLog.State = reconcilerpb.State_FAILED - ingestionLog.errors = append(ingestionLog.errors, err) - - if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { - ingestionLog.errors = append(ingestionLog.errors, err) - } - break - } + msg.changeSet = changeSet - ingestionLog.ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{ - Id: changeSet.ChangeSetID, - Data: csCompressed, - } - - if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { - ingestionLog.errors = append(ingestionLog.errors, err) - } + id, err := p.repository.CreateChangeSet(ctx, *changeSet, msg.ingestionLogID) + if err != nil { + msg.errors = append(msg.errors, fmt.Errorf("failed to create change set: %v", err)) + } + if len(changeSet.ChangeSet) > 0 { if applyChangeSetChan != nil { applyChangeSetChan <- IngestionLogToProcess{ - key: ingestionLog.key, - ingestionLog: ingestionLog.ingestionLog, - changeSet: ingestionLog.changeSet, + ingestionLogID: msg.ingestionLogID, + ingestionLog: msg.ingestionLog, + changeSetID: *id, + changeSet: msg.changeSet, } } } else { - ingestionLog.ingestionLog.State = reconcilerpb.State_NO_CHANGES - if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { - ingestionLog.errors = append(ingestionLog.errors, err) + if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil { + msg.errors = append(msg.errors, err) } } - p.logger.Debug("change set generated", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "changeSetID", ingestionLog.changeSet.ChangeSetID) + p.logger.Debug("change set generated", "id", id, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID) } } }() @@ -364,7 +333,7 @@ func (p *IngestionProcessor) ApplyChangeSet(ctx context.Context, applyChan <-cha case <-ctx.Done(): p.logger.Debug("context cancelled", "error", ctx.Err()) return - case ingestionLog, ok := <-applyChan: + case msg, ok := <-applyChan: if !ok { return } @@ -373,26 +342,23 @@ func (p *IngestionProcessor) ApplyChangeSet(ctx context.Context, applyChan <-cha return } - p.logger.Debug("applying change set", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "changeSetID", ingestionLog.changeSet.ChangeSetID) - - if err := applier.ApplyChangeSet(ctx, p.logger, *ingestionLog.changeSet, "", p.nbClient); err != nil { - p.logger.Debug("failed to apply change set", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "changeSetID", ingestionLog.changeSet.ChangeSetID, "error", err) - ingestionLog.errors = append(ingestionLog.errors, fmt.Errorf("failed to apply chang eset: %v", err)) + if err := applier.ApplyChangeSet(ctx, p.logger, *msg.changeSet, "", p.nbClient); err != nil { + p.logger.Debug("failed to apply change set", "id", msg.changeSetID, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID, "error", err) + msg.errors = append(msg.errors, fmt.Errorf("failed to apply change set: %v", err)) - ingestionLog.ingestionLog.State = reconcilerpb.State_FAILED - ingestionLog.ingestionLog.Error = extractIngestionError(err) + ingestionErr := extractIngestionError(err) - if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { - ingestionLog.errors = append(ingestionLog.errors, err) + if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err != nil { + msg.errors = append(msg.errors, err) } break } - ingestionLog.ingestionLog.State = reconcilerpb.State_RECONCILED - if _, err := p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { - ingestionLog.errors = append(ingestionLog.errors, err) + msg.ingestionLog.State = reconcilerpb.State_RECONCILED + if err := p.repository.UpdateIngestionLogStateWithError(ctx, msg.ingestionLogID, reconcilerpb.State_RECONCILED, nil); err != nil { + msg.errors = append(msg.errors, err) } - p.logger.Debug("change set applied", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "changeSetID", ingestionLog.changeSet.ChangeSetID) + p.logger.Debug("change set applied", "id", msg.changeSetID, "externalID", msg.changeSet.ChangeSetID, "ingestionLogID", msg.ingestionLogID) } } }() @@ -416,13 +382,8 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq continue } - ingestionLogID := ksuid.New().String() - - key := fmt.Sprintf("ingest-entity:%s-%d-%s", objectType, ingestionTs, ingestionLogID) - p.logger.Debug("ingest entity key", "key", key) - ingestionLog := &reconcilerpb.IngestionLog{ - Id: ingestionLogID, + Id: uuid.NewString(), RequestId: ingestReq.GetId(), ProducerAppName: ingestReq.GetProducerAppName(), ProducerAppVersion: ingestReq.GetProducerAppVersion(), @@ -434,14 +395,16 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq State: reconcilerpb.State_QUEUED, } - if _, err = p.writeIngestionLog(ctx, key, ingestionLog); err != nil { - errs = append(errs, fmt.Errorf("failed to write JSON: %v", err)) + id, err := p.repository.CreateIngestionLog(ctx, ingestionLog, nil) + if err != nil { + errs = append(errs, fmt.Errorf("failed to create ingestion log: %v", err)) continue } + p.logger.Debug("created ingestion log", "id", id, "externalID", ingestionLog.GetId()) generateIngestionLogChan <- IngestionLogToProcess{ - key: key, - ingestionLog: ingestionLog, + ingestionLogID: *id, + ingestionLog: ingestionLog, } } @@ -465,45 +428,6 @@ func extractIngestionError(err error) *reconcilerpb.IngestionError { return ingestionErr } -func (p *IngestionProcessor) writeIngestionLog(ctx context.Context, key string, ingestionLog *reconcilerpb.IngestionLog) ([]byte, error) { - ingestionLogJSON, err := protojson.Marshal(ingestionLog) - if err != nil { - return nil, fmt.Errorf("failed to marshal JSON: %v", err) - } - - ingestionLogJSON = normalizeIngestionLog(ingestionLogJSON) - - if _, err := p.redisClient.Do(ctx, "JSON.SET", key, "$", ingestionLogJSON).Result(); err != nil { - return nil, fmt.Errorf("failed to set JSON redis key: %v", err) - } - - return ingestionLogJSON, nil -} - -func normalizeIngestionLog(l []byte) []byte { - // replace ingestionTs string value as integer, see: https://github.com/golang/protobuf/issues/1414 - re := regexp.MustCompile(`"ingestionTs":"(\d+)"`) - return re.ReplaceAll(l, []byte(`"ingestionTs":$1`)) -} - -func compressChangeSet(cs *changeset.ChangeSet) ([]byte, error) { - csJSON, err := json.Marshal(cs) - if err != nil { - return nil, fmt.Errorf("failed to marshal change set JSON: %v", err) - } - - var brotliBuf bytes.Buffer - brotliWriter := brotli.NewWriter(&brotliBuf) - if _, err = brotliWriter.Write(csJSON); err != nil { - return nil, fmt.Errorf("failed to compress change set: %v", err) - } - if err = brotliWriter.Close(); err != nil { - return nil, fmt.Errorf("failed to compress change set: %v", err) - } - - return brotliBuf.Bytes(), nil -} - func extractObjectType(in *diodepb.Entity) (string, error) { switch in.GetEntity().(type) { case *diodepb.Entity_Device: diff --git a/diode-server/reconciler/ingestion_processor_internal_test.go b/diode-server/reconciler/ingestion_processor_internal_test.go index 11cd8c65..69328610 100644 --- a/diode-server/reconciler/ingestion_processor_internal_test.go +++ b/diode-server/reconciler/ingestion_processor_internal_test.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "io" "log/slog" "os" @@ -13,8 +12,8 @@ import ( "time" "github.com/andybalholm/brotli" + "github.com/google/uuid" "github.com/redis/go-redis/v9" - "github.com/segmentio/ksuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -28,92 +27,9 @@ import ( mr "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) +func int32Ptr(i int32) *int32 { return &i } func strPtr(s string) *string { return &s } -func TestWriteIngestionLog(t *testing.T) { - tests := []struct { - name string - ingestionLog *reconcilerpb.IngestionLog - hasError bool - hasMock bool - }{ - { - name: "write successful", - ingestionLog: &reconcilerpb.IngestionLog{ - RequestId: "cfa0f129-125c-440d-9e41-e87583cd7d89", - DataType: "dcim.site", - Entity: &diodepb.Entity{ - Entity: &diodepb.Entity_Site{ - Site: &diodepb.Site{ - Name: "Site A", - }, - }, - }, - }, - hasError: false, - hasMock: true, - }, - { - name: "redis error", - ingestionLog: &reconcilerpb.IngestionLog{ - RequestId: "cfa0f129-125c-440d-9e41-e87583cd7d89", - DataType: "dcim.site", - Entity: &diodepb.Entity{ - Entity: &diodepb.Entity_Site{ - Site: &diodepb.Site{ - Name: "Site A", - }, - }, - }, - IngestionTs: time.Now().UnixNano(), - }, - hasError: true, - hasMock: true, - }, - } - for i := range tests { - tt := tests[i] - - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - key := "test-key" - - // Create a mock Redis client - mockRedisClient := new(mr.RedisClient) - p := &IngestionProcessor{ - redisClient: mockRedisClient, - logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})), - Config: Config{ - AutoApplyChangesets: true, - ReconcilerRateLimiterRPS: 20, - ReconcilerRateLimiterBurst: 1, - }, - } - - // Set up the mock expectation - cmd := redis.NewCmd(ctx) - if tt.hasError { - cmd.SetErr(errors.New("error")) - } - mockRedisClient.On("Do", ctx, "JSON.SET", "test-key", "$", mock.Anything). - Return(cmd) - - // Call the method - _, err := p.writeIngestionLog(ctx, key, tt.ingestionLog) - - if tt.hasError { - require.Error(t, err) - } else { - require.NoError(t, err) - } - // Assert the expectations - if tt.hasMock { - mockRedisClient.AssertExpectations(t) - } - }) - } -} - func TestHandleStreamMessage(t *testing.T) { tests := []struct { name string @@ -235,6 +151,7 @@ func TestHandleStreamMessage(t *testing.T) { mockRedisClient := new(mr.RedisClient) mockRedisStreamClient := new(mr.RedisClient) mockNbClient := new(mnp.NetBoxAPI) + mockRepository := new(mr.Repository) logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) p := &IngestionProcessor{ @@ -247,6 +164,7 @@ func TestHandleStreamMessage(t *testing.T) { ReconcilerRateLimiterRPS: 20, ReconcilerRateLimiterBurst: 1, }, + repository: mockRepository, } request := redis.XMessage{} @@ -287,7 +205,7 @@ func TestHandleStreamMessage(t *testing.T) { } mockNbClient.On("ApplyChangeSet", ctx, mock.Anything).Return(tt.changeSetResponse, tt.changeSetError) if tt.entities[0].Entity != nil { - mockRedisClient.On("Do", ctx, "JSON.SET", mock.Anything, "$", mock.Anything).Return(redis.NewCmd(ctx)) + mockRepository.On("CreateIngestionLog", ctx, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) } mockRedisStreamClient.On("XAck", ctx, mock.Anything, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) mockRedisStreamClient.On("XDel", ctx, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) @@ -300,7 +218,7 @@ func TestHandleStreamMessage(t *testing.T) { } if tt.validMsg { - mockRedisClient.AssertExpectations(t) + mockRepository.AssertExpectations(t) } }) } @@ -423,7 +341,7 @@ func TestCompressChangeSet(t *testing.T) { }, }, } - compressed, err := compressChangeSet(&cs) + compressed, err := changeset.CompressChangeSet(&cs) require.NoError(t, err) // Decompress the compressed data @@ -454,7 +372,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { { name: "generate and apply change set", ingestionLog: &reconcilerpb.IngestionLog{ - Id: ksuid.New().String(), + Id: uuid.NewString(), RequestId: "cfa0f129-125c-440d-9e41-e87583cd7d89", ProducerAppName: "test-app", ProducerAppVersion: "0.1.0", @@ -489,7 +407,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { { name: "generate change set only", ingestionLog: &reconcilerpb.IngestionLog{ - Id: ksuid.New().String(), + Id: uuid.NewString(), RequestId: "cfa0f129-125c-440d-9e41-e87583cd7d89", ProducerAppName: "test-app", ProducerAppVersion: "0.1.0", @@ -524,6 +442,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { ctx := context.Background() mockRedisClient := new(mr.RedisClient) mockNbClient := new(mnp.NetBoxAPI) + mockRepository := new(mr.Repository) logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) p := &IngestionProcessor{ @@ -535,21 +454,17 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { ReconcilerRateLimiterRPS: 20, ReconcilerRateLimiterBurst: 1, }, + repository: mockRepository, } - // Set up the mock expectation - cmd := redis.NewCmd(ctx) - if tt.expectedError { - cmd.SetErr(errors.New("error")) - } - redisKey := fmt.Sprintf("ingest-entity:%s-%d-%s", tt.ingestionLog.DataType, tt.ingestionLog.IngestionTs, tt.ingestionLog.Id) - mockRedisClient.On("Do", ctx, "JSON.SET", redisKey, "$", mock.Anything). - Return(cmd) + ingestionLogID := int32(1) mockNbClient.On("RetrieveObjectState", ctx, mock.Anything).Return(tt.mockRetrieveObjectStateResponse, nil) if tt.autoApplyChangesets { + mockRepository.On("UpdateIngestionLogStateWithError", ctx, ingestionLogID, tt.expectedStatus, mock.Anything).Return(nil) mockNbClient.On("ApplyChangeSet", ctx, mock.Anything).Return(tt.mockApplyChangeSetResponse, nil) } + mockRepository.On("CreateChangeSet", ctx, mock.Anything, ingestionLogID).Return(int32Ptr(1), nil) bufCapacity := 1 @@ -567,8 +482,8 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { } generateChangeSetChannel <- IngestionLogToProcess{ - key: redisKey, - ingestionLog: tt.ingestionLog, + ingestionLogID: ingestionLogID, + ingestionLog: tt.ingestionLog, } close(generateChangeSetChannel) @@ -577,8 +492,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { <-applyChangeSetDone } - mockRedisClient.AssertExpectations(t) - require.NotNil(t, tt.ingestionLog.ChangeSet) + mockRepository.AssertExpectations(t) require.Equal(t, tt.expectedStatus, tt.ingestionLog.State) }) } diff --git a/diode-server/reconciler/ingestion_processor_test.go b/diode-server/reconciler/ingestion_processor_test.go index 15568019..698b386b 100644 --- a/diode-server/reconciler/ingestion_processor_test.go +++ b/diode-server/reconciler/ingestion_processor_test.go @@ -10,6 +10,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -18,6 +19,8 @@ import ( "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) +func int32Ptr(i int32) *int32 { return &i } + func TestNewIngestionProcessor(t *testing.T) { ctx := context.Background() s := miniredis.RunT(t) @@ -26,11 +29,10 @@ func TestNewIngestionProcessor(t *testing.T) { setupEnv(s.Addr()) defer teardownEnv() - ingestionLogRepoMock := mocks.NewIngestionLogRepository(t) - changeSetRepoMock := mocks.NewChangeSetRepository(t) + mockRepository := mocks.NewRepository(t) logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - processor, err := reconciler.NewIngestionProcessor(ctx, logger, ingestionLogRepoMock, changeSetRepoMock) + processor, err := reconciler.NewIngestionProcessor(ctx, logger, mockRepository) require.NoError(t, err) require.NotNil(t, processor) @@ -46,13 +48,12 @@ func TestIngestionProcessorStart(t *testing.T) { setupEnv(s.Addr()) defer teardownEnv() - ingestionLogRepoMock := mocks.NewIngestionLogRepository(t) - changeSetRepoMock := mocks.NewChangeSetRepository(t) + mockRepository := mocks.NewRepository(t) logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) ctx := context.Background() - processor, err := reconciler.NewIngestionProcessor(ctx, logger, ingestionLogRepoMock, changeSetRepoMock) + processor, err := reconciler.NewIngestionProcessor(ctx, logger, mockRepository) require.NoError(t, err) require.NotNil(t, processor) @@ -222,6 +223,9 @@ func TestIngestionProcessorStart(t *testing.T) { // Wait server time.Sleep(50 * time.Millisecond) + mockRepository.On("CreateIngestionLog", ctx, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("UpdateIngestionLogStateWithError", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) + redisClient := redis.NewClient(&redis.Options{ Addr: s.Addr(), DB: 1, @@ -244,4 +248,5 @@ func TestIngestionProcessorStart(t *testing.T) { // Stop the processor err = processor.Stop() assert.NoError(t, err) + mockRepository.AssertExpectations(t) } diff --git a/diode-server/reconciler/logs_retriever.go b/diode-server/reconciler/logs_retriever.go index 343000fa..6abffe25 100644 --- a/diode-server/reconciler/logs_retriever.go +++ b/diode-server/reconciler/logs_retriever.go @@ -5,196 +5,71 @@ import ( "context" "encoding/base64" "encoding/binary" - "encoding/json" "fmt" "log/slog" - "strings" - - "github.com/redis/go-redis/v9" - "google.golang.org/protobuf/encoding/protojson" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" ) -type extraAttributesWrapper struct { - ExtraAttributes string `json:"$"` - IngestionTs string `json:"ingestion_ts"` -} -type redisLogResult struct { - ExtraAttributes extraAttributesWrapper `json:"extra_attributes"` - ID string `json:"id"` -} - -type redisLogsResponse struct { - Results []redisLogResult `json:"results"` - TotalResults int32 `json:"total_results"` -} +func retrieveIngestionMetrics(ctx context.Context, repository Repository) (*reconcilerpb.RetrieveIngestionLogsResponse, error) { + var metrics reconcilerpb.IngestionMetrics -func convertMapInterface(data interface{}) interface{} { - switch v := data.(type) { - case map[interface{}]interface{}: - converted := make(map[string]interface{}) - for key, value := range v { - converted[fmt.Sprintf("%v", key)] = convertMapInterface(value) // Recursive conversion for nested maps - } - return converted - case []interface{}: - // If the value is a slice, apply conversion recursively to each element - for i, value := range v { - v[i] = convertMapInterface(value) + ingestionLogsPerState, err := repository.CountIngestionLogsPerState(ctx) + if err != nil { + return nil, err + } + + var totalIngestionLogs int32 + + for state, count := range ingestionLogsPerState { + totalIngestionLogs += count + switch state { + case reconcilerpb.State_QUEUED: + metrics.Queued = count + case reconcilerpb.State_RECONCILED: + metrics.Reconciled = count + case reconcilerpb.State_FAILED: + metrics.Failed = count + case reconcilerpb.State_NO_CHANGES: + metrics.NoChanges = count } - return v - default: - return v } -} - -func encodeIntToBase64(num int32) string { - // Create a buffer to hold the binary representation - buf := new(bytes.Buffer) - // Write the int value as a binary value into the buffer - if err := binary.Write(buf, binary.BigEndian, num); err != nil { - fmt.Println("error writing binary:", err) - } + metrics.Total = totalIngestionLogs - // Encode the binary data to base64 - return base64.StdEncoding.EncodeToString(buf.Bytes()) + return &reconcilerpb.RetrieveIngestionLogsResponse{Metrics: &metrics}, nil } -func decodeBase64ToInt(encoded string) (int32, error) { - // Decode the base64 string back to bytes - decoded, err := base64.StdEncoding.DecodeString(encoded) +func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, repository Repository, in *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) { + ingestionLogsMetricsResponse, err := retrieveIngestionMetrics(ctx, repository) if err != nil { - return 0, err + return nil, err } - // Convert the byte slice back to int64 - buf := bytes.NewReader(decoded) - var num int32 - if err := binary.Read(buf, binary.BigEndian, &num); err != nil { - return 0, err - } - - return num, nil -} - -func retrieveIngestionMetrics(ctx context.Context, client RedisClient) (*reconcilerpb.RetrieveIngestionLogsResponse, error) { - pipe := client.Pipeline() - - results := []*redis.Cmd{ - pipe.Do(ctx, "FT.SEARCH", "ingest-entity", "*", "LIMIT", 0, 0), - } - for s := reconcilerpb.State_QUEUED; s <= reconcilerpb.State_NO_CHANGES; s++ { - stateName, ok := reconcilerpb.State_name[int32(s)] - if !ok { - return nil, fmt.Errorf("failed to retrieve ingestion logs: failed to get state name of %d", s) - } - stateName = escapeSpecialChars(stateName) - results = append(results, pipe.Do(ctx, "FT.SEARCH", "ingest-entity", fmt.Sprintf("@state:{%s}", stateName), "LIMIT", 0, 0)) - } - - if _, err := pipe.Exec(ctx); err != nil { - return nil, fmt.Errorf("failed to retrieve ingestion logs: %w", err) - } - - var metrics reconcilerpb.IngestionMetrics - - for q := range results { - res, err := results[q].Result() - if err != nil { - return nil, fmt.Errorf("failed to retrieve ingestion logs: %w", err) - } - - conv := convertMapInterface(res) - totalRes, ok := conv.(map[string]interface{})["total_results"].(int64) - if !ok { - return nil, fmt.Errorf("failed to retrieve ingestion logs: failed to parse total_results") - } - total := int32(totalRes) - if q == int(reconcilerpb.State_QUEUED) { - metrics.Queued = total - } else if q == int(reconcilerpb.State_RECONCILED) { - metrics.Reconciled = total - } else if q == int(reconcilerpb.State_FAILED) { - metrics.Failed = total - } else if q == int(reconcilerpb.State_NO_CHANGES) { - metrics.NoChanges = total - } else { - metrics.Total = total - } - } - return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: nil, Metrics: &metrics, NextPageToken: ""}, nil -} - -func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, client RedisClient, in *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) { if in.GetOnlyMetrics() { logger.Debug("retrieving only ingestion metrics") - return retrieveIngestionMetrics(ctx, client) + return ingestionLogsMetricsResponse, nil } pageSize := in.GetPageSize() if in.PageSize == nil || pageSize >= 1000 { - pageSize = 100 // Default to 100 + pageSize = 100 } - query := buildQueryFilter(in) - - // Construct the base FT.SEARCH query - queryArgs := []interface{}{ - "FT.SEARCH", - "ingest-entity", // Index name - query, - } - - // Apply sorting by id in descending order - queryArgs = append(queryArgs, "SORTBY", "id", "DESC") - - var err error - - // Apply limit for pagination - var offset int32 + offset := int32(0) if in.PageToken != "" { - offset, err = decodeBase64ToInt(in.PageToken) + decodedPageToken, err := decodeBase64ToInt(in.PageToken) if err != nil { - logger.Warn("error decoding page token", "error", err) + return nil, err } + offset = decodedPageToken } - queryArgs = append(queryArgs, "LIMIT", offset, pageSize) - - logger.Debug("retrieving ingestion logs", "query", queryArgs) - // Execute the query using Redis - result, err := client.Do(ctx, queryArgs...).Result() + logs, err := repository.RetrieveIngestionLogs(ctx, in, pageSize, offset) if err != nil { return nil, fmt.Errorf("failed to retrieve ingestion logs: %w", err) } - res := convertMapInterface(result) - - jsonBytes, err := json.Marshal(res) - if err != nil { - return nil, fmt.Errorf("error marshaling ingestion logs: %w", err) - } - - var response redisLogsResponse - - // Unmarshal the result into the struct - if err = json.Unmarshal(jsonBytes, &response); err != nil { - return nil, fmt.Errorf("error parsing JSON: %w", err) - } - - logs := make([]*reconcilerpb.IngestionLog, 0) - - for _, logsResult := range response.Results { - ingestionLog := &reconcilerpb.IngestionLog{} - if err := protojson.Unmarshal([]byte(logsResult.ExtraAttributes.ExtraAttributes), ingestionLog); err != nil { - return nil, fmt.Errorf("error parsing ExtraAttributes JSON: %v", err) - } - - logs = append(logs, ingestionLog) - } - var nextPageToken string if len(logs) == int(pageSize) { @@ -203,71 +78,37 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, client Redi } // Fill metrics - var metrics reconcilerpb.IngestionMetrics - if in.State != nil { - if in.GetState() == reconcilerpb.State_UNSPECIFIED { - metrics.Total = response.TotalResults - } else if in.GetState() == reconcilerpb.State_QUEUED { - metrics.Queued = response.TotalResults - } else if in.GetState() == reconcilerpb.State_RECONCILED { - metrics.Reconciled = response.TotalResults - } else if in.GetState() == reconcilerpb.State_FAILED { - metrics.Failed = response.TotalResults - } else if in.GetState() == reconcilerpb.State_NO_CHANGES { - metrics.NoChanges = response.TotalResults - } - } else { - metrics.Total = response.TotalResults - } + metrics := ingestionLogsMetricsResponse.GetMetrics() - return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: &metrics, NextPageToken: nextPageToken}, nil + return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: metrics, NextPageToken: nextPageToken}, nil } -func buildQueryFilter(req *reconcilerpb.RetrieveIngestionLogsRequest) string { - queryFilter := "*" - - // apply optional filters for ingestion timestamps (start and end) - if req.GetIngestionTsStart() > 0 || req.GetIngestionTsEnd() > 0 { - ingestionTsFilter := fmt.Sprintf("@ingestion_ts:[%d inf]", req.GetIngestionTsStart()) - - if req.GetIngestionTsEnd() > 0 { - ingestionTsFilter = fmt.Sprintf("@ingestion_ts:[%d %d]", req.GetIngestionTsStart(), req.GetIngestionTsEnd()) - } - - queryFilter = ingestionTsFilter - } - - // apply optional filters for ingestion state - if req.State != nil { - state := escapeSpecialChars(req.GetState().String()) - stateFilter := fmt.Sprintf("@state:{%s}", state) - if queryFilter == "*" { - queryFilter = stateFilter - } else { - queryFilter = fmt.Sprintf("%s %s", queryFilter, stateFilter) - } +func decodeBase64ToInt(encoded string) (int32, error) { + // Decode the base64 string back to bytes + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return 0, err } - if req.GetDataType() != "" { - dataType := escapeSpecialChars(req.GetDataType()) - dataTypeFilter := fmt.Sprintf("@data_type:{%s}", dataType) - if queryFilter == "*" { - queryFilter = dataTypeFilter - } else { - queryFilter = fmt.Sprintf("%s %s", queryFilter, dataTypeFilter) - } + // Convert the byte slice back to int64 + buf := bytes.NewReader(decoded) + var num int32 + if err := binary.Read(buf, binary.BigEndian, &num); err != nil { + return 0, err } - return queryFilter + return num, nil } -func escapeSpecialChars(s string) string { - // replace ,.<>{}[]"':;!@#$%^&*()-+=~ with double backslash - // ref: https://redis.io/docs/latest/develop/interact/search-and-query/advanced-concepts/escaping/ - oldNew := []string{ - ",", "\\,", ".", "\\.", "<", "\\<", ">", "\\>", "{", "\\{", "}", "\\}", "[", "\\[", "]", "\\]", "\"", "\\\"", - "'", "\\'", ":", "\\:", ";", "\\;", "!", "\\!", "@", "\\@", "#", "\\#", "$", "\\$", "%", "\\%", "^", "\\^", - "&", "\\&", "*", "\\*", "(", "\\(", ")", "\\)", "-", "\\-", "+", "\\+", "=", "\\=", "~", "\\~", +func encodeIntToBase64(num int32) string { + // Create a buffer to hold the binary representation + buf := new(bytes.Buffer) + + // Write the int value as a binary value into the buffer + if err := binary.Write(buf, binary.BigEndian, num); err != nil { + fmt.Println("error writing binary:", err) } - return strings.NewReplacer(oldNew...).Replace(s) + + // Encode the binary data to base64 + return base64.StdEncoding.EncodeToString(buf.Bytes()) } diff --git a/diode-server/reconciler/logs_retriever_test.go b/diode-server/reconciler/logs_retriever_test.go deleted file mode 100644 index c15137a7..00000000 --- a/diode-server/reconciler/logs_retriever_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package reconciler - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEscapeSpecialChars(t *testing.T) { - tests := []struct { - name string - input string - want string - }{ - { - name: "empty string", - input: "", - want: "", - }, - { - name: "string with all considered special characters", - input: `,.<>{}[]"':;!@#$%^&*()-+=~`, - want: `\,\.\<\>\{\}\[\]\"\'\:\;\!\@\#\$\%\^\&\*\(\)\-\+\=\~`, - }, - { - name: "producer app name", - input: "example-app", - want: "example\\-app", - }, - { - name: "producer app version", - input: "0.1.0", - want: "0\\.1\\.0", - }, - { - name: "request ID", - input: "123e4567-e89b-12d3-a456-426614174000", - want: "123e4567\\-e89b\\-12d3\\-a456\\-426614174000", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := escapeSpecialChars(tt.input); got != tt.want { - assert.Equal(t, tt.want, got) - } - }) - } -} diff --git a/diode-server/reconciler/migration.go b/diode-server/reconciler/migration.go deleted file mode 100644 index dbecbbbb..00000000 --- a/diode-server/reconciler/migration.go +++ /dev/null @@ -1,164 +0,0 @@ -package reconciler - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "log/slog" - "time" - - "github.com/redis/go-redis/v9" -) - -const ( - // RedisDiodeMigrationsKey is the key for the redis diode migrations - RedisDiodeMigrationsKey = "diode.migrations" -) - -// AppliedMigrations is a list of applied migrations -type AppliedMigrations []MigrationLog - -// MigrationLog is a log of a migration -type MigrationLog struct { - Name string `json:"name"` - ApplyTs int64 `json:"apply_ts"` -} - -type migration struct { - name string - run func(context.Context, *slog.Logger, RedisClient) error -} - -func migrate(ctx context.Context, logger *slog.Logger, redisClient RedisClient) error { - res, err := redisClient.Do(ctx, "JSON.GET", RedisDiodeMigrationsKey).Result() - if err != nil && !errors.Is(err, redis.Nil) { - return fmt.Errorf("failed to get JSON redis key %s: %v", RedisDiodeMigrationsKey, err) - } - - var appliedMigrations AppliedMigrations - if res != nil { - _ = json.Unmarshal([]byte(res.(string)), &appliedMigrations) - } - - logger.Debug("migrations", "appliedMigrations", appliedMigrations) - - if len(appliedMigrations) == 0 { - logger.Debug("no applied migrations found") - } - - migrations := []migration{ - { - name: "0001_initial", - run: initialMigration(), - }, - } - - for _, m := range migrations { - var found bool - for _, am := range appliedMigrations { - if am.Name == m.name { - found = true - break - } - } - - if !found { - logger.Debug("applying migration", "name", m.name) - - if err := m.run(ctx, logger, redisClient); err != nil { - return fmt.Errorf("failed to run migration %s: %v", m.name, err) - } - - logger.Debug("migration applied", "name", m.name) - - appliedMigrations = append(appliedMigrations, MigrationLog{ - Name: m.name, - ApplyTs: time.Now().UnixNano(), - }) - } - } - - appliedMigrationsJSON, err := json.Marshal(appliedMigrations) - if err != nil { - return fmt.Errorf("failed to marshal applied migrations %#v: %v", appliedMigrations, err) - } - - if _, err = redisClient.Do(ctx, "JSON.SET", RedisDiodeMigrationsKey, "$", appliedMigrationsJSON).Result(); err != nil { - return fmt.Errorf("failed to set JSON redis key %s with value %s: %v", RedisDiodeMigrationsKey, appliedMigrationsJSON, err) - } - - return nil -} - -func initialMigration() func(context.Context, *slog.Logger, RedisClient) error { - return func(ctx context.Context, logger *slog.Logger, redisClient RedisClient) error { - // Drop FT index ingest-entity due to schema change - logger.Debug("dropping index", "name", RedisIngestEntityIndexName) - _, err := redisClient.Do(ctx, "FT.DROPINDEX", RedisIngestEntityIndexName).Result() - if err != nil && !errors.Is(err, redis.Nil) && err.Error() != "Unknown Index name" { - return fmt.Errorf("failed to drop FT index %s: %v", RedisIngestEntityIndexName, err) - } - - // Delete all keys with prefix ingest-entity - logger.Debug("deleting keys with prefix", "prefix", "ingest-entity:*") - iter := redisClient.Scan(ctx, 0, "ingest-entity:*", 10).Iterator() - for iter.Next(ctx) { - if err := redisClient.Del(ctx, iter.Val()).Err(); err != nil { - return fmt.Errorf("failed to delete key %s: %v", iter.Val(), err) - } - } - if err := iter.Err(); err != nil { - return fmt.Errorf("failed to iterate over keys with prefix %s: %v", RedisIngestEntityIndexName, err) - } - - // Create new FT index ingest-entity - logger.Debug("creating index", "name", RedisIngestEntityIndexName) - queryArgs := []interface{}{ - "FT.CREATE", - RedisIngestEntityIndexName, - "ON", - "JSON", - "PREFIX", - "1", - "ingest-entity:", - "SCHEMA", - "$.id", - "AS", - "id", - "TEXT", - "SORTABLE", - "$.dataType", - "AS", - "data_type", - "TAG", - "$.state", - "AS", - "state", - "TAG", - "$.requestId", - "AS", - "request_id", - "TAG", - "$.producerAppName", - "AS", - "producer_app_name", - "TAG", - "$.producerAppVersion", - "AS", - "producer_app_version", - "TAG", - "$.ingestionTs", - "AS", - "ingestion_ts", - "NUMERIC", - "SORTABLE", - } - - if _, err = redisClient.Do(ctx, queryArgs...).Result(); err != nil { - return fmt.Errorf("failed to create FT index %s: %v", RedisIngestEntityIndexName, err) - } - - return nil - } -} diff --git a/diode-server/reconciler/migration_test.go b/diode-server/reconciler/migration_test.go deleted file mode 100644 index 06f1c91a..00000000 --- a/diode-server/reconciler/migration_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package reconciler - -import ( - "context" - "encoding/json" - "errors" - "log/slog" - "os" - "testing" - "time" - - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - mr "github.com/netboxlabs/diode/diode-server/reconciler/mocks" -) - -func TestMigrate(t *testing.T) { - tests := []struct { - name string - appliedMigrations []MigrationLog - err error - }{ - { - name: "no applied migrations found", - appliedMigrations: nil, - err: nil, - }, - { - name: "applied migrations found", - appliedMigrations: []MigrationLog{{Name: "0001_initial", ApplyTs: time.Now().Unix()}}, - err: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mockRedisClient := new(mr.RedisClient) - - processor := &IngestionProcessor{ - redisClient: mockRedisClient, - logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})), - Config: Config{ - AutoApplyChangesets: true, - ReconcilerRateLimiterRPS: 20, - ReconcilerRateLimiterBurst: 1, - }, - } - - ctx := context.Background() - - if tt.appliedMigrations == nil { - cmd := redis.NewCmd(ctx) - if tt.err != nil { - cmd.SetErr(errors.New("error")) - } else { - cmd.SetVal(nil) - cmd.SetErr(nil) - } - mockRedisClient.On("Do", context.Background(), "JSON.GET", RedisDiodeMigrationsKey).Return(cmd) - mockRedisClient.On("Do", context.Background(), "FT.DROPINDEX", RedisIngestEntityIndexName).Return(cmd) - scanResults := []string{"ingest-entity:1", "ingest-entity:2", "ingest-entity:3"} - mockRedisClient.On("Scan", context.Background(), uint64(0), "ingest-entity:*", int64(10)).Return(redis.NewScanCmdResult(scanResults, 0, nil)) - for _, key := range scanResults { - mockRedisClient.On("Del", context.Background(), key).Return(redis.NewIntResult(0, nil)) - } - mockRedisClient.On("Do", context.Background(), - "FT.CREATE", - RedisIngestEntityIndexName, - "ON", - "JSON", - "PREFIX", - "1", - "ingest-entity:", - "SCHEMA", - "$.id", - "AS", - "id", - "TEXT", - "SORTABLE", - "$.dataType", - "AS", - "data_type", - "TAG", - "$.state", - "AS", - "state", - "TAG", - "$.requestId", - "AS", - "request_id", - "TAG", - "$.producerAppName", - "AS", - "producer_app_name", - "TAG", - "$.producerAppVersion", - "AS", - "producer_app_version", - "TAG", - "$.ingestionTs", - "AS", - "ingestion_ts", - "NUMERIC", - "SORTABLE", - ).Return(cmd) - mockRedisClient.On("Do", context.Background(), "JSON.SET", RedisDiodeMigrationsKey, "$", mock.Anything).Return(cmd) - } else { - getAppliedMigrationsRespCmd := redis.NewCmd(ctx) - appliedMigrationsJSON, _ := json.Marshal(tt.appliedMigrations) - getAppliedMigrationsRespCmd.SetVal(string(appliedMigrationsJSON)) - getAppliedMigrationsRespCmd.SetErr(nil) - mockRedisClient.On("Do", context.Background(), "JSON.GET", RedisDiodeMigrationsKey).Return(getAppliedMigrationsRespCmd) - mockRedisClient.On("Do", context.Background(), "JSON.SET", RedisDiodeMigrationsKey, "$", appliedMigrationsJSON).Return(redis.NewCmd(ctx)) - } - - err := migrate(ctx, processor.logger, mockRedisClient) - if tt.err != nil { - assert.Error(t, err) - assert.Equal(t, tt.err, err) - } else { - assert.NoError(t, err) - } - }) - } -} diff --git a/diode-server/reconciler/mocks/changesetrepository.go b/diode-server/reconciler/mocks/changesetrepository.go deleted file mode 100644 index e80840a2..00000000 --- a/diode-server/reconciler/mocks/changesetrepository.go +++ /dev/null @@ -1,85 +0,0 @@ -// Code generated by mockery v2.49.1. DO NOT EDIT. - -package mocks - -import ( - context "context" - - mock "github.com/stretchr/testify/mock" - - reconcilerpb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" -) - -// ChangeSetRepository is an autogenerated mock type for the ChangeSetRepository type -type ChangeSetRepository struct { - mock.Mock -} - -type ChangeSetRepository_Expecter struct { - mock *mock.Mock -} - -func (_m *ChangeSetRepository) EXPECT() *ChangeSetRepository_Expecter { - return &ChangeSetRepository_Expecter{mock: &_m.Mock} -} - -// CreateChangeSet provides a mock function with given fields: ctx, changeSet -func (_m *ChangeSetRepository) CreateChangeSet(ctx context.Context, changeSet *reconcilerpb.ChangeSet) error { - ret := _m.Called(ctx, changeSet) - - if len(ret) == 0 { - panic("no return value specified for CreateChangeSet") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.ChangeSet) error); ok { - r0 = rf(ctx, changeSet) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ChangeSetRepository_CreateChangeSet_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateChangeSet' -type ChangeSetRepository_CreateChangeSet_Call struct { - *mock.Call -} - -// CreateChangeSet is a helper method to define mock.On call -// - ctx context.Context -// - changeSet *reconcilerpb.ChangeSet -func (_e *ChangeSetRepository_Expecter) CreateChangeSet(ctx interface{}, changeSet interface{}) *ChangeSetRepository_CreateChangeSet_Call { - return &ChangeSetRepository_CreateChangeSet_Call{Call: _e.mock.On("CreateChangeSet", ctx, changeSet)} -} - -func (_c *ChangeSetRepository_CreateChangeSet_Call) Run(run func(ctx context.Context, changeSet *reconcilerpb.ChangeSet)) *ChangeSetRepository_CreateChangeSet_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*reconcilerpb.ChangeSet)) - }) - return _c -} - -func (_c *ChangeSetRepository_CreateChangeSet_Call) Return(_a0 error) *ChangeSetRepository_CreateChangeSet_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ChangeSetRepository_CreateChangeSet_Call) RunAndReturn(run func(context.Context, *reconcilerpb.ChangeSet) error) *ChangeSetRepository_CreateChangeSet_Call { - _c.Call.Return(run) - return _c -} - -// NewChangeSetRepository creates a new instance of ChangeSetRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewChangeSetRepository(t interface { - mock.TestingT - Cleanup(func()) -}) *ChangeSetRepository { - mock := &ChangeSetRepository{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/diode-server/reconciler/mocks/client.go b/diode-server/reconciler/mocks/client.go index 842ecd68..f2c2e177 100644 --- a/diode-server/reconciler/mocks/client.go +++ b/diode-server/reconciler/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.49.1. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package mocks @@ -25,7 +25,7 @@ func (_m *Client) EXPECT() *Client_Expecter { return &Client_Expecter{mock: &_m.Mock} } -// Close provides a mock function with given fields: +// Close provides a mock function with no fields func (_m *Client) Close() error { ret := _m.Called() diff --git a/diode-server/reconciler/mocks/ingestionlogrepository.go b/diode-server/reconciler/mocks/ingestionlogrepository.go deleted file mode 100644 index c9371d2e..00000000 --- a/diode-server/reconciler/mocks/ingestionlogrepository.go +++ /dev/null @@ -1,86 +0,0 @@ -// Code generated by mockery v2.49.1. DO NOT EDIT. - -package mocks - -import ( - context "context" - - mock "github.com/stretchr/testify/mock" - - reconcilerpb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" -) - -// IngestionLogRepository is an autogenerated mock type for the IngestionLogRepository type -type IngestionLogRepository struct { - mock.Mock -} - -type IngestionLogRepository_Expecter struct { - mock *mock.Mock -} - -func (_m *IngestionLogRepository) EXPECT() *IngestionLogRepository_Expecter { - return &IngestionLogRepository_Expecter{mock: &_m.Mock} -} - -// CreateIngestionLog provides a mock function with given fields: ctx, ingestionLog, sourceMetadata -func (_m *IngestionLogRepository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) error { - ret := _m.Called(ctx, ingestionLog, sourceMetadata) - - if len(ret) == 0 { - panic("no return value specified for CreateIngestionLog") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) error); ok { - r0 = rf(ctx, ingestionLog, sourceMetadata) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// IngestionLogRepository_CreateIngestionLog_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateIngestionLog' -type IngestionLogRepository_CreateIngestionLog_Call struct { - *mock.Call -} - -// CreateIngestionLog is a helper method to define mock.On call -// - ctx context.Context -// - ingestionLog *reconcilerpb.IngestionLog -// - sourceMetadata []byte -func (_e *IngestionLogRepository_Expecter) CreateIngestionLog(ctx interface{}, ingestionLog interface{}, sourceMetadata interface{}) *IngestionLogRepository_CreateIngestionLog_Call { - return &IngestionLogRepository_CreateIngestionLog_Call{Call: _e.mock.On("CreateIngestionLog", ctx, ingestionLog, sourceMetadata)} -} - -func (_c *IngestionLogRepository_CreateIngestionLog_Call) Run(run func(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte)) *IngestionLogRepository_CreateIngestionLog_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*reconcilerpb.IngestionLog), args[2].([]byte)) - }) - return _c -} - -func (_c *IngestionLogRepository_CreateIngestionLog_Call) Return(_a0 error) *IngestionLogRepository_CreateIngestionLog_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *IngestionLogRepository_CreateIngestionLog_Call) RunAndReturn(run func(context.Context, *reconcilerpb.IngestionLog, []byte) error) *IngestionLogRepository_CreateIngestionLog_Call { - _c.Call.Return(run) - return _c -} - -// NewIngestionLogRepository creates a new instance of IngestionLogRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewIngestionLogRepository(t interface { - mock.TestingT - Cleanup(func()) -}) *IngestionLogRepository { - mock := &IngestionLogRepository{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/diode-server/reconciler/mocks/redisclient.go b/diode-server/reconciler/mocks/redisclient.go index 74586f30..e2ede4a9 100644 --- a/diode-server/reconciler/mocks/redisclient.go +++ b/diode-server/reconciler/mocks/redisclient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.49.1. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package mocks @@ -23,7 +23,7 @@ func (_m *RedisClient) EXPECT() *RedisClient_Expecter { return &RedisClient_Expecter{mock: &_m.Mock} } -// Close provides a mock function with given fields: +// Close provides a mock function with no fields func (_m *RedisClient) Close() error { ret := _m.Called() @@ -238,7 +238,7 @@ func (_c *RedisClient_Ping_Call) RunAndReturn(run func(context.Context) *redis.S return _c } -// Pipeline provides a mock function with given fields: +// Pipeline provides a mock function with no fields func (_m *RedisClient) Pipeline() redis.Pipeliner { ret := _m.Called() diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go new file mode 100644 index 00000000..176915e0 --- /dev/null +++ b/diode-server/reconciler/mocks/repository.go @@ -0,0 +1,328 @@ +// Code generated by mockery v2.50.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + changeset "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + + mock "github.com/stretchr/testify/mock" + + reconcilerpb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" +) + +// Repository is an autogenerated mock type for the Repository type +type Repository struct { + mock.Mock +} + +type Repository_Expecter struct { + mock *mock.Mock +} + +func (_m *Repository) EXPECT() *Repository_Expecter { + return &Repository_Expecter{mock: &_m.Mock} +} + +// CountIngestionLogsPerState provides a mock function with given fields: ctx +func (_m *Repository) CountIngestionLogsPerState(ctx context.Context) (map[reconcilerpb.State]int32, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for CountIngestionLogsPerState") + } + + var r0 map[reconcilerpb.State]int32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[reconcilerpb.State]int32, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[reconcilerpb.State]int32); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[reconcilerpb.State]int32) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Repository_CountIngestionLogsPerState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CountIngestionLogsPerState' +type Repository_CountIngestionLogsPerState_Call struct { + *mock.Call +} + +// CountIngestionLogsPerState is a helper method to define mock.On call +// - ctx context.Context +func (_e *Repository_Expecter) CountIngestionLogsPerState(ctx interface{}) *Repository_CountIngestionLogsPerState_Call { + return &Repository_CountIngestionLogsPerState_Call{Call: _e.mock.On("CountIngestionLogsPerState", ctx)} +} + +func (_c *Repository_CountIngestionLogsPerState_Call) Run(run func(ctx context.Context)) *Repository_CountIngestionLogsPerState_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Repository_CountIngestionLogsPerState_Call) Return(_a0 map[reconcilerpb.State]int32, _a1 error) *Repository_CountIngestionLogsPerState_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Repository_CountIngestionLogsPerState_Call) RunAndReturn(run func(context.Context) (map[reconcilerpb.State]int32, error)) *Repository_CountIngestionLogsPerState_Call { + _c.Call.Return(run) + return _c +} + +// CreateChangeSet provides a mock function with given fields: ctx, changeSet, ingestionLogID +func (_m *Repository) CreateChangeSet(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32) (*int32, error) { + ret := _m.Called(ctx, changeSet, ingestionLogID) + + if len(ret) == 0 { + panic("no return value specified for CreateChangeSet") + } + + var r0 *int32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, changeset.ChangeSet, int32) (*int32, error)); ok { + return rf(ctx, changeSet, ingestionLogID) + } + if rf, ok := ret.Get(0).(func(context.Context, changeset.ChangeSet, int32) *int32); ok { + r0 = rf(ctx, changeSet, ingestionLogID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*int32) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, changeset.ChangeSet, int32) error); ok { + r1 = rf(ctx, changeSet, ingestionLogID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Repository_CreateChangeSet_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateChangeSet' +type Repository_CreateChangeSet_Call struct { + *mock.Call +} + +// CreateChangeSet is a helper method to define mock.On call +// - ctx context.Context +// - changeSet changeset.ChangeSet +// - ingestionLogID int32 +func (_e *Repository_Expecter) CreateChangeSet(ctx interface{}, changeSet interface{}, ingestionLogID interface{}) *Repository_CreateChangeSet_Call { + return &Repository_CreateChangeSet_Call{Call: _e.mock.On("CreateChangeSet", ctx, changeSet, ingestionLogID)} +} + +func (_c *Repository_CreateChangeSet_Call) Run(run func(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32)) *Repository_CreateChangeSet_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(changeset.ChangeSet), args[2].(int32)) + }) + return _c +} + +func (_c *Repository_CreateChangeSet_Call) Return(_a0 *int32, _a1 error) *Repository_CreateChangeSet_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Repository_CreateChangeSet_Call) RunAndReturn(run func(context.Context, changeset.ChangeSet, int32) (*int32, error)) *Repository_CreateChangeSet_Call { + _c.Call.Return(run) + return _c +} + +// CreateIngestionLog provides a mock function with given fields: ctx, ingestionLog, sourceMetadata +func (_m *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) { + ret := _m.Called(ctx, ingestionLog, sourceMetadata) + + if len(ret) == 0 { + panic("no return value specified for CreateIngestionLog") + } + + var r0 *int32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) (*int32, error)); ok { + return rf(ctx, ingestionLog, sourceMetadata) + } + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) *int32); ok { + r0 = rf(ctx, ingestionLog, sourceMetadata) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*int32) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *reconcilerpb.IngestionLog, []byte) error); ok { + r1 = rf(ctx, ingestionLog, sourceMetadata) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Repository_CreateIngestionLog_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateIngestionLog' +type Repository_CreateIngestionLog_Call struct { + *mock.Call +} + +// CreateIngestionLog is a helper method to define mock.On call +// - ctx context.Context +// - ingestionLog *reconcilerpb.IngestionLog +// - sourceMetadata []byte +func (_e *Repository_Expecter) CreateIngestionLog(ctx interface{}, ingestionLog interface{}, sourceMetadata interface{}) *Repository_CreateIngestionLog_Call { + return &Repository_CreateIngestionLog_Call{Call: _e.mock.On("CreateIngestionLog", ctx, ingestionLog, sourceMetadata)} +} + +func (_c *Repository_CreateIngestionLog_Call) Run(run func(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte)) *Repository_CreateIngestionLog_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*reconcilerpb.IngestionLog), args[2].([]byte)) + }) + return _c +} + +func (_c *Repository_CreateIngestionLog_Call) Return(_a0 *int32, _a1 error) *Repository_CreateIngestionLog_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Repository_CreateIngestionLog_Call) RunAndReturn(run func(context.Context, *reconcilerpb.IngestionLog, []byte) (*int32, error)) *Repository_CreateIngestionLog_Call { + _c.Call.Return(run) + return _c +} + +// RetrieveIngestionLogs provides a mock function with given fields: ctx, filter, limit, offset +func (_m *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) { + ret := _m.Called(ctx, filter, limit, offset) + + if len(ret) == 0 { + panic("no return value specified for RetrieveIngestionLogs") + } + + var r0 []*reconcilerpb.IngestionLog + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.RetrieveIngestionLogsRequest, int32, int32) ([]*reconcilerpb.IngestionLog, error)); ok { + return rf(ctx, filter, limit, offset) + } + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.RetrieveIngestionLogsRequest, int32, int32) []*reconcilerpb.IngestionLog); ok { + r0 = rf(ctx, filter, limit, offset) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*reconcilerpb.IngestionLog) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *reconcilerpb.RetrieveIngestionLogsRequest, int32, int32) error); ok { + r1 = rf(ctx, filter, limit, offset) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Repository_RetrieveIngestionLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveIngestionLogs' +type Repository_RetrieveIngestionLogs_Call struct { + *mock.Call +} + +// RetrieveIngestionLogs is a helper method to define mock.On call +// - ctx context.Context +// - filter *reconcilerpb.RetrieveIngestionLogsRequest +// - limit int32 +// - offset int32 +func (_e *Repository_Expecter) RetrieveIngestionLogs(ctx interface{}, filter interface{}, limit interface{}, offset interface{}) *Repository_RetrieveIngestionLogs_Call { + return &Repository_RetrieveIngestionLogs_Call{Call: _e.mock.On("RetrieveIngestionLogs", ctx, filter, limit, offset)} +} + +func (_c *Repository_RetrieveIngestionLogs_Call) Run(run func(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32)) *Repository_RetrieveIngestionLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*reconcilerpb.RetrieveIngestionLogsRequest), args[2].(int32), args[3].(int32)) + }) + return _c +} + +func (_c *Repository_RetrieveIngestionLogs_Call) Return(_a0 []*reconcilerpb.IngestionLog, _a1 error) *Repository_RetrieveIngestionLogs_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Repository_RetrieveIngestionLogs_Call) RunAndReturn(run func(context.Context, *reconcilerpb.RetrieveIngestionLogsRequest, int32, int32) ([]*reconcilerpb.IngestionLog, error)) *Repository_RetrieveIngestionLogs_Call { + _c.Call.Return(run) + return _c +} + +// UpdateIngestionLogStateWithError provides a mock function with given fields: ctx, id, state, ingestionError +func (_m *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error { + ret := _m.Called(ctx, id, state, ingestionError) + + if len(ret) == 0 { + panic("no return value specified for UpdateIngestionLogStateWithError") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int32, reconcilerpb.State, *reconcilerpb.IngestionError) error); ok { + r0 = rf(ctx, id, state, ingestionError) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Repository_UpdateIngestionLogStateWithError_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateIngestionLogStateWithError' +type Repository_UpdateIngestionLogStateWithError_Call struct { + *mock.Call +} + +// UpdateIngestionLogStateWithError is a helper method to define mock.On call +// - ctx context.Context +// - id int32 +// - state reconcilerpb.State +// - ingestionError *reconcilerpb.IngestionError +func (_e *Repository_Expecter) UpdateIngestionLogStateWithError(ctx interface{}, id interface{}, state interface{}, ingestionError interface{}) *Repository_UpdateIngestionLogStateWithError_Call { + return &Repository_UpdateIngestionLogStateWithError_Call{Call: _e.mock.On("UpdateIngestionLogStateWithError", ctx, id, state, ingestionError)} +} + +func (_c *Repository_UpdateIngestionLogStateWithError_Call) Run(run func(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError)) *Repository_UpdateIngestionLogStateWithError_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int32), args[2].(reconcilerpb.State), args[3].(*reconcilerpb.IngestionError)) + }) + return _c +} + +func (_c *Repository_UpdateIngestionLogStateWithError_Call) Return(_a0 error) *Repository_UpdateIngestionLogStateWithError_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Repository_UpdateIngestionLogStateWithError_Call) RunAndReturn(run func(context.Context, int32, reconcilerpb.State, *reconcilerpb.IngestionError) error) *Repository_UpdateIngestionLogStateWithError_Call { + _c.Call.Return(run) + return _c +} + +// NewRepository creates a new instance of Repository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRepository(t interface { + mock.TestingT + Cleanup(func()) +}) *Repository { + mock := &Repository{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/diode-server/reconciler/repositories.go b/diode-server/reconciler/repositories.go deleted file mode 100644 index 339f994b..00000000 --- a/diode-server/reconciler/repositories.go +++ /dev/null @@ -1,17 +0,0 @@ -package reconciler - -import ( - "context" - - "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" -) - -// IngestionLogRepository is an interface for interacting with ingestion logs. -type IngestionLogRepository interface { - CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) error -} - -// ChangeSetRepository is an interface for interacting with change sets. -type ChangeSetRepository interface { - CreateChangeSet(ctx context.Context, changeSet *reconcilerpb.ChangeSet) error -} diff --git a/diode-server/reconciler/repository.go b/diode-server/reconciler/repository.go new file mode 100644 index 00000000..db1fb611 --- /dev/null +++ b/diode-server/reconciler/repository.go @@ -0,0 +1,17 @@ +package reconciler + +import ( + "context" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" +) + +// Repository is an interface for interacting with ingestion logs and change sets. +type Repository interface { + CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) + UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error + RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) + CountIngestionLogsPerState(ctx context.Context) (map[reconcilerpb.State]int32, error) + CreateChangeSet(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32) (*int32, error) +} diff --git a/diode-server/reconciler/server.go b/diode-server/reconciler/server.go index ca1c7856..84484e8f 100644 --- a/diode-server/reconciler/server.go +++ b/diode-server/reconciler/server.go @@ -35,11 +35,12 @@ type Server struct { grpcListener net.Listener grpcServer *grpc.Server redisClient RedisClient + repository Repository apiKeys APIKeys } // NewServer creates a new reconciler server -func NewServer(ctx context.Context, logger *slog.Logger) (*Server, error) { +func NewServer(ctx context.Context, logger *slog.Logger, repository Repository) (*Server, error) { var cfg Config envconfig.MustProcess("", &cfg) @@ -72,6 +73,7 @@ func NewServer(ctx context.Context, logger *slog.Logger) (*Server, error) { grpcListener: grpcListener, grpcServer: grpcServer, redisClient: redisClient, + repository: repository, apiKeys: apiKeys, } @@ -140,7 +142,7 @@ func (s *Server) RetrieveIngestionDataSources(_ context.Context, in *reconcilerp // RetrieveIngestionLogs retrieves logs func (s *Server) RetrieveIngestionLogs(ctx context.Context, in *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) { - return retrieveIngestionLogs(ctx, s.logger, s.redisClient, in) + return retrieveIngestionLogs(ctx, s.logger, s.repository, in) } func validateRetrieveIngestionDataSourcesRequest(in *reconcilerpb.RetrieveIngestionDataSourcesRequest) error { diff --git a/diode-server/reconciler/server_internal_test.go b/diode-server/reconciler/server_internal_test.go index d2df1165..f80f438c 100644 --- a/diode-server/reconciler/server_internal_test.go +++ b/diode-server/reconciler/server_internal_test.go @@ -7,7 +7,6 @@ import ( "os" "testing" - "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -17,25 +16,6 @@ import ( mr "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) -// MockPipeliner is a mock implementation of the redis Pipeliner interface. -type MockPipeliner struct { - mock.Mock - redis.Pipeliner -} - -// Do is a mock of Pipeliner's Do method. -func (m *MockPipeliner) Do(ctx context.Context, args ...interface{}) *redis.Cmd { - calledArgs := m.Called(ctx, args) - return calledArgs.Get(0).(*redis.Cmd) -} - -// Exec is a mock of Pipeliner's Exec method. -func (m *MockPipeliner) Exec(ctx context.Context) ([]redis.Cmder, error) { - args := m.Called(ctx) - cmds := make([]redis.Cmder, 0) - return cmds, args.Error(0) -} - func TestIsAuthenticated(t *testing.T) { tests := []struct { name string @@ -128,42 +108,70 @@ func TestIsAuthenticated(t *testing.T) { func TestRetrieveLogs(t *testing.T) { tests := []struct { - name string - in reconcilerpb.RetrieveIngestionLogsRequest - result interface{} - response *reconcilerpb.RetrieveIngestionLogsResponse - queryFilter string - queryLimitOffset int32 - failCmd bool - hasError bool + name string + in reconcilerpb.RetrieveIngestionLogsRequest + ingestionLogsPerState map[reconcilerpb.State]int32 + ingestionLogs []*reconcilerpb.IngestionLog + response *reconcilerpb.RetrieveIngestionLogsResponse + hasError bool }{ { name: "valid request", in: reconcilerpb.RetrieveIngestionLogsRequest{}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":2}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 2, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", + DataType: "dcim.interface", + State: reconcilerpb.State_RECONCILED, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", + }, + Name: "Gig 2", + }, }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, }, - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.device","entity":{"device":{"name":"Conference_Room_AP_02","deviceType":{"model":"Cisco Aironet 3802","manufacturer":{"name":"Cisco"}},"role":{"name":"Wireless_AP"},"serial":"PQR456789012","site":{"name":"HQ"}}},"id":"2mC8GVBGFg6NyLsQxuS4IYMB6FI","ingestionTs":1725552654541975975,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"bc1052e3-656a-42f0-b364-27b385e02a0c","sdkName":"diode-sdk-python","sdkVersion":"0.0.1","state":2}`, - "ingestion_ts": "1725552654541976064", + Error: nil, + }, + { + Id: "2mC8GVBGFg6NyLsQxuS4IYMB6FI", + DataType: "dcim.device", + State: reconcilerpb.State_RECONCILED, + RequestId: "bc1052e3-656a-42f0-b364-27b385e02a0c", + IngestionTs: 1725552654541975975, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-python", + SdkVersion: "0.0.1", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Device{ + Device: &diodepb.Device{ + Name: "Conference_Room_AP_02", + DeviceType: &diodepb.DeviceType{ + Model: "Cisco Aironet 3802", + Manufacturer: &diodepb.Manufacturer{ + Name: "Cisco", + }, + }, + Role: &diodepb.Role{Name: "Wireless_AP"}, + Serial: strPtr("PQR456789012"), + Site: &diodepb.Site{Name: "HQ"}, + }, }, - "id": "ingest-entity:dcim.device-1725552654541975975-2mC8GVBGFg6NyLsQxuS4IYMB6FI", - "values": []interface{}{}, }, }, - "total_results": 2, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -216,34 +224,53 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 2, + Total: 2, + Reconciled: 2, }, NextPageToken: "F/Jk/zc08gA=", }, - queryFilter: "*", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "request with reconciliation error", in: reconcilerpb.RetrieveIngestionLogsRequest{}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"ipam.ipaddress","entity":{"ip_address":{"address":"192.168.1.1","interface":null,"description":"Vendor: HUAWEI TECHNOLOGIES"}},"error":{"message":"failed to apply change set","code":400,"details":{"change_set_id":"6304c706-f955-4bcb-a1cc-514293d53d07","result":"failed","errors":[{"error":"address: Duplicate IP address found in global table: 192.168.1.1/32","change_id":"ff9e29b2-7a64-40ba-99a8-21f44768f60a"}]}},"id":"2mC8KCvHNasrYlfxSASk9hatfYC","ingestionTs":1725046967777525928,"producerAppName":"example-app","producerAppVersion":"0.1.0","request_id":"e03c4892-5b7e-4c39-b5e6-0225a264ab8b","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":3}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_FAILED: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + DataType: "ipam.ipaddress", + State: reconcilerpb.State_FAILED, + RequestId: "e03c4892-5b7e-4c39-b5e6-0225a264ab8b", + IngestionTs: 1725046967777525928, + ProducerAppName: "example-app", + ProducerAppVersion: "0.1.0", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_IpAddress{ + IpAddress: &diodepb.IPAddress{ + Address: "192.168.1.1", + Description: strPtr("Vendor: HUAWEI TECHNOLOGIES"), + }, + }, + }, + Error: &reconcilerpb.IngestionError{ + Message: "failed to apply change set", + Code: 400, + Details: &reconcilerpb.IngestionError_Details{ + ChangeSetId: "6304c706-f955-4bcb-a1cc-514293d53d07", + Result: "failed", + Errors: []*reconcilerpb.IngestionError_Details_Error{ + { + ChangeId: "ff9e29b2-7a64-40ba-99a8-21f44768f60a", + Error: "address: Duplicate IP address found in global table: 192.168.1.1/32", + }, + }, }, - "id": "ingest-entity:ipam.ipaddress-1725046967777525928-2mC8KCvHNasrYlfxSASk9hatfYC", - "values": []interface{}{}, }, }, - "total_results": 2, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -280,34 +307,42 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 2, + Total: 1, + Failed: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "*", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "filter by new state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_QUEUED.Enum()}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mC8NYwfIKM5rFDibDBuytASSOi","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":1}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_QUEUED: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + DataType: "dcim.interface", + State: reconcilerpb.State_QUEUED, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", + }, + Name: "Gig 2", + }, }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mC8NYwfIKM5rFDibDBuytASSOi", - "values": []interface{}{}, }, + Error: nil, }, - "total_results": 1, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -333,34 +368,43 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, Queued: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "@state:{QUEUED}", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "filter by reconciled state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_RECONCILED.Enum()}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":2}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", + DataType: "dcim.interface", + State: reconcilerpb.State_RECONCILED, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", + }, + Name: "Gig 2", + }, }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, }, + Error: nil, }, - "total_results": 1, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -387,34 +431,43 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, Reconciled: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "@state:{RECONCILED}", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "filter by failed state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_FAILED.Enum()}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":3}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_FAILED: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", + DataType: "dcim.interface", + State: reconcilerpb.State_FAILED, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", + }, + Name: "Gig 2", + }, }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, }, + Error: nil, }, - "total_results": 1, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -441,34 +494,43 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, Failed: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "@state:{FAILED}", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "filter by no changes state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_NO_CHANGES.Enum()}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":4}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_NO_CHANGES: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", + DataType: "dcim.interface", + State: reconcilerpb.State_NO_CHANGES, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", + }, + Name: "Gig 2", + }, }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, }, + Error: nil, }, - "total_results": 1, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -495,34 +557,43 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, NoChanges: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "@state:{NO_CHANGES}", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "filter by data type", in: reconcilerpb.RetrieveIngestionLogsRequest{DataType: "dcim.interface"}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":2}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", + DataType: "dcim.interface", + State: reconcilerpb.State_RECONCILED, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", + }, + Name: "Gig 2", + }, }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, }, + Error: nil, }, - "total_results": 1, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -549,34 +620,43 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, + Total: 1, + Reconciled: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "@data_type:{dcim\\.interface}", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "filter by timestamp", in: reconcilerpb.RetrieveIngestionLogsRequest{IngestionTsStart: 1725552914392208639}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":2}`, - "ingestion_ts": "1725552914392208640", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", + DataType: "dcim.interface", + State: reconcilerpb.State_RECONCILED, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", + }, + Name: "Gig 2", + }, }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, }, + Error: nil, }, - "total_results": 1, - "warning": []interface{}{}, - }), + }, response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -603,112 +683,43 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, + Total: 1, + Reconciled: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "@ingestion_ts:[1725552914392208639 inf]", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { name: "pagination check", in: reconcilerpb.RetrieveIngestionLogsRequest{PageToken: "AAAFlg=="}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":2}`, - "ingestion_ts": "1725552914392208640", - }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, - }, - }, - "total_results": 1, - "warning": []interface{}{}, - }), - response: &reconcilerpb.RetrieveIngestionLogsResponse{ - Logs: []*reconcilerpb.IngestionLog{ - { - Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", - DataType: "dcim.interface", - State: reconcilerpb.State_RECONCILED, - RequestId: "req-id", - IngestionTs: 1725552914392208722, - ProducerAppName: "diode-agent", - ProducerAppVersion: "0.0.1", - SdkName: "diode-sdk-go", - SdkVersion: "0.1.0", - Entity: &diodepb.Entity{ - Entity: &diodepb.Entity_Interface{ - Interface: &diodepb.Interface{ - Device: &diodepb.Device{ - Name: "my_dev", - }, - Name: "Gig 2", + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, + ingestionLogs: []*reconcilerpb.IngestionLog{ + { + Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", + DataType: "dcim.interface", + State: reconcilerpb.State_RECONCILED, + RequestId: "req-id", + IngestionTs: 1725552914392208722, + ProducerAppName: "diode-agent", + ProducerAppVersion: "0.0.1", + SdkName: "diode-sdk-go", + SdkVersion: "0.1.0", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Interface{ + Interface: &diodepb.Interface{ + Device: &diodepb.Device{ + Name: "my_dev", }, + Name: "Gig 2", }, }, - Error: nil, }, + Error: nil, }, - Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, - }, - NextPageToken: "AAAFlw==", }, - queryFilter: "*", - queryLimitOffset: 1430, - failCmd: false, - hasError: false, - }, - { - name: "error parsing extra attributes", - in: reconcilerpb.RetrieveIngestionLogsRequest{PageToken: "AAAFlg=="}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `"extra":is":"invalid"`, - "ingestion_ts": "1725552914392208640", - }, - "id": "ingest-entity:dcim.interface", - "values": []interface{}{}, - }, - }, - "total_results": 1, - "warning": []interface{}{}, - }), - queryFilter: "*", - queryLimitOffset: 1430, - failCmd: false, - hasError: true, - }, - { - name: "error decoding page token", - in: reconcilerpb.RetrieveIngestionLogsRequest{PageToken: "invalid"}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":2}`, - "ingestion_ts": "1725552914392208640", - }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, - }, - }, - "total_results": 1, - "warning": []interface{}{}, - }), response: &reconcilerpb.RetrieveIngestionLogsResponse{ Logs: []*reconcilerpb.IngestionLog{ { @@ -735,44 +746,17 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, + Total: 1, + Reconciled: 1, }, NextPageToken: "AAAFlw==", }, - queryFilter: "*", - queryLimitOffset: 0, - failCmd: false, - hasError: false, + hasError: false, }, { - name: "error parsing response json", - in: reconcilerpb.RetrieveIngestionLogsRequest{}, - result: interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{ - "extra_attributes": map[interface{}]interface{}{ - "$": `{"dataType":"dcim.interface","entity":{"interface":{"device":{"name":"my_dev"},"name":"Gig 2"}},"id":"2mAT7vZ38H4ttI0i5dBebwJbSnZ","ingestionTs":1725552914392208722,"producerAppName":"diode-agent","producerAppVersion":"0.0.1","request_id":"req-id","sdkName":"diode-sdk-go","sdkVersion":"0.1.0","state":2}`, - "ingestion_ts": 123, - }, - "id": "ingest-entity:dcim.interface-1725552914392208722-2mAT7vZ38H4ttI0i5dBebwJbSnZ", - "values": []interface{}{}, - }, - }, - "total_results": 1, - "warning": []interface{}{}, - }), - queryFilter: "*", - failCmd: false, - hasError: true, - }, - { - name: "redis error", - in: reconcilerpb.RetrieveIngestionLogsRequest{}, - queryFilter: "*", - failCmd: true, - hasError: true, + name: "error decoding page token", + in: reconcilerpb.RetrieveIngestionLogsRequest{PageToken: "invalid"}, + hasError: true, }, } for i := range tests { @@ -781,19 +765,23 @@ func TestRetrieveLogs(t *testing.T) { ctx := context.Background() logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - mockRedisClient := new(mr.RedisClient) - - cmd := redis.NewCmd(ctx) - cmd.SetVal(tt.result) - if tt.failCmd { - cmd.SetErr(errors.New("error")) - } - mockRedisClient.On("Do", ctx, "FT.SEARCH", "ingest-entity", tt.queryFilter, "SORTBY", "id", "DESC", "LIMIT", tt.queryLimitOffset, int32(100)). - Return(cmd) - + mockRedisClient := mr.NewRedisClient(t) + mockRepository := mr.NewRepository(t) server := &Server{ redisClient: mockRedisClient, logger: logger, + repository: mockRepository, + } + + var retrieveErr error + if tt.hasError { + retrieveErr = errors.New("failed to retrieve ingestion logs") + } + + mockRepository.On("CountIngestionLogsPerState", ctx).Return(tt.ingestionLogsPerState, nil) + + if !tt.hasError { + mockRepository.On("RetrieveIngestionLogs", ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.ingestionLogs, retrieveErr) } response, err := server.RetrieveIngestionLogs(ctx, &tt.in) @@ -815,6 +803,7 @@ func TestRetrieveLogs(t *testing.T) { } require.Equal(t, tt.response.Metrics, response.Metrics) } + mockRepository.AssertExpectations(t) }) } } @@ -823,34 +812,27 @@ func TestRetrieveIngestionLogsMetricsOnly(t *testing.T) { tests := []struct { name string expectedTotal interface{} - cmdError bool - execError error hasError bool errorMsg string }{ { name: "valid request", expectedTotal: int64(10), - cmdError: false, hasError: false, }, { name: "query error", - cmdError: true, hasError: true, errorMsg: "failed to retrieve ingestion logs: cmd error", }, { - name: "exec error", - cmdError: false, - execError: errors.New("exec error"), - hasError: true, - errorMsg: "failed to retrieve ingestion logs: exec error", + name: "exec error", + hasError: true, + errorMsg: "failed to retrieve ingestion logs: exec error", }, { name: "error getting total results", expectedTotal: nil, - cmdError: false, hasError: true, errorMsg: "failed to retrieve ingestion logs: failed to parse total_results", }, @@ -869,83 +851,30 @@ func TestRetrieveIngestionLogsMetricsOnly(t *testing.T) { Total: 10, } - mockRedisClient := new(mr.RedisClient) - - mockPipeliner := new(MockPipeliner) - - cmdTotal := redis.NewCmd(ctx) - if tt.cmdError { - cmdTotal.SetErr(errors.New("cmd error")) + mockRedisClient := mr.NewRedisClient(t) + mockRepository := mr.NewRepository(t) + server := &Server{ + redisClient: mockRedisClient, + logger: logger, + repository: mockRepository, } - cmdTotal.SetVal(interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{}, - }, - "total_results": tt.expectedTotal, - "warning": []interface{}{}, - })) - mockPipeliner.On("Do", ctx, []interface{}{"FT.SEARCH", "ingest-entity", "*", "LIMIT", 0, 0}).Return(cmdTotal) - - cmdNew := redis.NewCmd(ctx) - cmdNew.SetVal(interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{}, - }, - "total_results": int64(expected.Queued), - "warning": []interface{}{}, - })) - mockPipeliner.On("Do", ctx, []interface{}{"FT.SEARCH", "ingest-entity", "@state:{QUEUED}", "LIMIT", 0, 0}).Return(cmdNew) - cmdReconciled := redis.NewCmd(ctx) - cmdReconciled.SetVal(interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{}, - }, - "total_results": int64(expected.Reconciled), - "warning": []interface{}{}, - })) - mockPipeliner.On("Do", ctx, []interface{}{"FT.SEARCH", "ingest-entity", "@state:{RECONCILED}", "LIMIT", 0, 0}).Return(cmdReconciled) - - cmdFailed := redis.NewCmd(ctx) - cmdFailed.SetVal(interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{}, - }, - "total_results": int64(expected.Failed), - "warning": []interface{}{}, - })) - mockPipeliner.On("Do", ctx, []interface{}{"FT.SEARCH", "ingest-entity", "@state:{FAILED}", "LIMIT", 0, 0}).Return(cmdFailed) + ingestionLogStateMetricsMap := map[reconcilerpb.State]int32{ + reconcilerpb.State_QUEUED: expected.Queued, + reconcilerpb.State_RECONCILED: expected.Reconciled, + reconcilerpb.State_FAILED: expected.Failed, + reconcilerpb.State_NO_CHANGES: expected.NoChanges, + } - cmdNoChanges := redis.NewCmd(ctx) - cmdNoChanges.SetVal(interface{}(map[interface{}]interface{}{ - "attributes": []interface{}{}, - "format": "STRING", - "results": []interface{}{ - map[interface{}]interface{}{}, - }, - "total_results": int64(expected.NoChanges), - "warning": []interface{}{}, - })) - mockPipeliner.On("Do", ctx, []interface{}{"FT.SEARCH", "ingest-entity", "@state:{NO_CHANGES}", "LIMIT", 0, 0}).Return(cmdNoChanges) + var countErr error + if tt.hasError { + countErr = errors.New(tt.errorMsg) + } - mockPipeliner.On("Exec", ctx).Return(tt.execError) - mockRedisClient.On("Pipeline").Return(mockPipeliner) + mockRepository.On("CountIngestionLogsPerState", ctx).Return(ingestionLogStateMetricsMap, countErr) in := reconcilerpb.RetrieveIngestionLogsRequest{OnlyMetrics: true} - server := &Server{ - redisClient: mockRedisClient, - logger: logger, - } - response, err := server.RetrieveIngestionLogs(ctx, &in) if tt.hasError { require.Error(t, err) @@ -954,6 +883,7 @@ func TestRetrieveIngestionLogsMetricsOnly(t *testing.T) { require.NoError(t, err) require.Equal(t, expected, response.Metrics) } + mockRepository.AssertExpectations(t) }) } } diff --git a/diode-server/reconciler/server_test.go b/diode-server/reconciler/server_test.go index 1a625b05..fbc1f9e0 100644 --- a/diode-server/reconciler/server_test.go +++ b/diode-server/reconciler/server_test.go @@ -16,6 +16,7 @@ import ( pb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/mocks" ) func startTestServer(ctx context.Context, t *testing.T, redisAddr string) (*reconciler.Server, *grpc.ClientConn) { @@ -26,7 +27,8 @@ func startTestServer(ctx context.Context, t *testing.T, redisAddr string) (*reco s := grpc.NewServer() logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - server, err := reconciler.NewServer(ctx, logger) + mockRepository := mocks.NewRepository(t) + server, err := reconciler.NewServer(ctx, logger, mockRepository) require.NoError(t, err) pb.RegisterReconcilerServiceServer(s, server) @@ -60,7 +62,8 @@ func TestNewServer(t *testing.T) { defer teardownEnv() logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - server, err := reconciler.NewServer(ctx, logger) + mockRepository := mocks.NewRepository(t) + server, err := reconciler.NewServer(ctx, logger, mockRepository) require.NoError(t, err) require.NotNil(t, server) diff --git a/diode-server/sqlc.yaml b/diode-server/sqlc.yaml index 5a2f896e..36d45791 100644 --- a/diode-server/sqlc.yaml +++ b/diode-server/sqlc.yaml @@ -10,3 +10,10 @@ sql: sql_package: "pgx/v5" output_models_file_name: "types.go" emit_json_tags: true + overrides: + - column: v_ingestion_logs_with_change_set.change_set + go_type: + type: ChangeSet + - column: changes.data + go_type: + type: any