Skip to content

Commit

Permalink
chore: ingestion processing cleanup (#209)
Browse files Browse the repository at this point in the history
Co-authored-by: Michal Fiedorowicz <[email protected]>
  • Loading branch information
ltucker and mfiedorowicz authored Dec 23, 2024
1 parent 99c7000 commit 252498e
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 250 deletions.
5 changes: 5 additions & 0 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
ORDER BY id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

-- name: RetrieveIngestionLogByExternalID :one
SELECT *
FROM ingestion_logs
WHERE external_id = $1;

-- name: RetrieveIngestionLogsWithChangeSets :many
SELECT v_ingestion_logs_with_change_set.*
FROM v_ingestion_logs_with_change_set
Expand Down
13 changes: 13 additions & 0 deletions diode-server/dbstore/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon
return &createdIngestionLog.ID, nil
}

// RetrieveIngestionLogByExternalID looks up an ingestion log using its external identifier (uuid)
func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) {
ingestionLog, err := r.queries.RetrieveIngestionLogByExternalID(ctx, uuid)
if err != nil {
return nil, nil, err
}
log, err := ingestionLog.ToProto()
if err != nil {
return nil, nil, err
}
return &ingestionLog.ID, log, nil
}

// UpdateIngestionLogStateWithError updates an ingestion log with a new state and error.
func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error {
params := postgres.UpdateIngestionLogStateWithErrorParams{
Expand Down
40 changes: 40 additions & 0 deletions diode-server/gen/dbstore/postgres/adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package postgres

import (
"fmt"

"google.golang.org/protobuf/encoding/protojson"

"github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb"
)

// ToProto converts sqlc structure to analogous protobuf
func (log IngestionLog) ToProto() (*reconcilerpb.IngestionLog, error) {
entity := &diodepb.Entity{}
if err := protojson.Unmarshal(log.Entity, entity); err != nil {
return nil, fmt.Errorf("failed to unmarshal entity: %w", err)
}
var ingestionErr reconcilerpb.IngestionError
if log.Error != nil {
if err := protojson.Unmarshal(log.Error, &ingestionErr); err != nil {
return nil, fmt.Errorf("failed to unmarshal error: %w", err)
}
}

pblog := &reconcilerpb.IngestionLog{
Id: log.ExternalID,
DataType: log.DataType.String,
State: reconcilerpb.State(log.State.Int32),
RequestId: log.RequestID.String,
IngestionTs: log.IngestionTs.Int64,
ProducerAppName: log.ProducerAppName.String,
ProducerAppVersion: log.ProducerAppVersion.String,
SdkName: log.SdkName.String,
SdkVersion: log.SdkVersion.String,
Entity: entity,
Error: &ingestionErr,
}

return pblog, nil
}
29 changes: 29 additions & 0 deletions diode-server/gen/dbstore/postgres/ingestion_logs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 252498e

Please sign in to comment.