Skip to content

Commit

Permalink
add implementation for rpc ActionIngestionLog diff operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ltucker committed Dec 23, 2024
1 parent 3f46f8a commit 377dc45
Show file tree
Hide file tree
Showing 20 changed files with 760 additions and 353 deletions.
6 changes: 0 additions & 6 deletions diode-proto/diode/v1/reconciler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,7 @@ message ActionIngestionLogRequest {

// The response from the ActionIngestionLog request
message ActionIngestionLogResponse {
message Error {
string message = 1;
int32 code = 2;
}

IngestionLog log = 1;
repeated Error errors = 2;
}

// Reconciler service API
Expand Down
8 changes: 7 additions & 1 deletion diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/netboxlabs/diode/diode-server/dbstore/postgres"
"github.com/netboxlabs/diode/diode-server/migrator"
"github.com/netboxlabs/diode/diode-server/netboxdiodeplugin"
"github.com/netboxlabs/diode/diode-server/reconciler"
"github.com/netboxlabs/diode/diode-server/server"
)
Expand Down Expand Up @@ -44,6 +45,11 @@ func main() {
defer dbPool.Close()

repository := postgres.NewRepository(dbPool)
nbClient, err := netboxdiodeplugin.NewClient(s.Logger(), cfg.DiodeToNetBoxAPIKey)
if err != nil {
s.Logger().Error("failed to create netbox diode plugin client", "error", err)
os.Exit(1)
}

ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), repository)
if err != nil {
Expand All @@ -56,7 +62,7 @@ func main() {
os.Exit(1)
}

gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository)
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), nbClient, repository)
if err != nil {
s.Logger().Error("failed to instantiate gRPC server", "error", err)
os.Exit(1)
Expand Down
5 changes: 5 additions & 0 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
ORDER BY id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

-- name: RetrieveIngestionLogByExternalID :one
SELECT *
FROM ingestion_logs
WHERE external_id = $1;

-- name: RetrieveIngestionLogsWithChangeSets :many
SELECT v_ingestion_logs_with_change_set.*
FROM v_ingestion_logs_with_change_set
Expand Down
57 changes: 57 additions & 0 deletions diode-server/dbstore/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package postgres
import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"

"github.com/netboxlabs/diode/diode-server/gen/dbstore/postgres"
Expand Down Expand Up @@ -56,6 +60,59 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon
return &createdIngestionLog.ID, nil
}

// RetrieveIngestionLogByExternalID looks up an ingestion log using its external identifier (uuid)
func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) {
ingestionLog, err := r.queries.RetrieveIngestionLogByExternalID(ctx, uuid)
if err != nil {
return nil, nil, mapRepositoryError(err)
}
log, err := pgToProtoIngestionLog(ingestionLog)
if err != nil {
return nil, nil, mapRepositoryError(err)
}
return &ingestionLog.ID, log, nil
}

func mapRepositoryError(err error) error {
if err == nil {
return nil
}
if errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("%w: %w", status.Errorf(codes.NotFound, "not found"), err)
}
// TODO(ltucker): pack appropriate error details ...
return err
}

func pgToProtoIngestionLog(ingestionLog postgres.IngestionLog) (*reconcilerpb.IngestionLog, error) {
entity := &diodepb.Entity{}
if err := protojson.Unmarshal(ingestionLog.Entity, entity); err != nil {
return nil, fmt.Errorf("failed to unmarshal entity: %w", err)
}
var ingestionErr reconcilerpb.IngestionError
if ingestionLog.Error != nil {
if err := protojson.Unmarshal(ingestionLog.Error, &ingestionErr); err != nil {
return nil, fmt.Errorf("failed to unmarshal error: %w", err)
}
}

log := &reconcilerpb.IngestionLog{
Id: ingestionLog.ExternalID,
DataType: ingestionLog.DataType.String,
State: reconcilerpb.State(ingestionLog.State.Int32),
RequestId: ingestionLog.RequestID.String,
IngestionTs: ingestionLog.IngestionTs.Int64,
ProducerAppName: ingestionLog.ProducerAppName.String,
ProducerAppVersion: ingestionLog.ProducerAppVersion.String,
SdkName: ingestionLog.SdkName.String,
SdkVersion: ingestionLog.SdkVersion.String,
Entity: entity,
Error: &ingestionErr,
}

return log, nil
}

// UpdateIngestionLogStateWithError updates an ingestion log with a new state and error.
func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error {
params := postgres.UpdateIngestionLogStateWithErrorParams{
Expand Down
29 changes: 29 additions & 0 deletions diode-server/gen/dbstore/postgres/ingestion_logs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 377dc45

Please sign in to comment.