Skip to content

Commit

Permalink
small refactoring of reconciler flows
Browse files Browse the repository at this point in the history
  • Loading branch information
ltucker committed Dec 23, 2024
1 parent 3f46f8a commit 7efa321
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 834 deletions.
27 changes: 0 additions & 27 deletions diode-proto/diode/v1/reconciler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,37 +111,10 @@ message RetrieveIngestionLogsResponse {
string next_page_token = 3; // Token for the next page of results, if any
}

enum ActionType {
ACTION_UNSPECIFIED = 0;
ACTION_DIFF = 1;
ACTION_APPLY = 2;
ACTION_REJECT = 3;
}

// The request to take action on an ingestion log
message ActionIngestionLogRequest {
ActionType action = 1;
string ingestion_log_id = 2;
optional string branch_id = 3;
}

// The response from the ActionIngestionLog request
message ActionIngestionLogResponse {
message Error {
string message = 1;
int32 code = 2;
}

IngestionLog log = 1;
repeated Error errors = 2;
}

// Reconciler service API
service ReconcilerService {
// Retrieves ingestion data sources
rpc RetrieveIngestionDataSources(RetrieveIngestionDataSourcesRequest) returns (RetrieveIngestionDataSourcesResponse) {}
// Retrieves ingestion logs
rpc RetrieveIngestionLogs(RetrieveIngestionLogsRequest) returns (RetrieveIngestionLogsResponse);
// Takes action on an ingestion log
rpc ActionIngestionLog(ActionIngestionLogRequest) returns (ActionIngestionLogResponse);
}
5 changes: 5 additions & 0 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
ORDER BY id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

-- name: RetrieveIngestionLogByExternalID :one
SELECT *
FROM ingestion_logs
WHERE external_id = $1;

-- name: RetrieveIngestionLogsWithChangeSets :many
SELECT v_ingestion_logs_with_change_set.*
FROM v_ingestion_logs_with_change_set
Expand Down
42 changes: 42 additions & 0 deletions diode-server/dbstore/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,48 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon
return &createdIngestionLog.ID, nil
}

// RetrieveIngestionLogByExternalID looks up an ingestion log using its external identifier (uuid)
func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) {
ingestionLog, err := r.queries.RetrieveIngestionLogByExternalID(ctx, uuid)
if err != nil {
return nil, nil, err
}
log, err := pgToProtoIngestionLog(ingestionLog)
if err != nil {
return nil, nil, err
}
return &ingestionLog.ID, log, nil
}

func pgToProtoIngestionLog(ingestionLog postgres.IngestionLog) (*reconcilerpb.IngestionLog, error) {
entity := &diodepb.Entity{}
if err := protojson.Unmarshal(ingestionLog.Entity, entity); err != nil {
return nil, fmt.Errorf("failed to unmarshal entity: %w", err)
}
var ingestionErr reconcilerpb.IngestionError
if ingestionLog.Error != nil {
if err := protojson.Unmarshal(ingestionLog.Error, &ingestionErr); err != nil {
return nil, fmt.Errorf("failed to unmarshal error: %w", err)
}
}

log := &reconcilerpb.IngestionLog{
Id: ingestionLog.ExternalID,
DataType: ingestionLog.DataType.String,
State: reconcilerpb.State(ingestionLog.State.Int32),
RequestId: ingestionLog.RequestID.String,
IngestionTs: ingestionLog.IngestionTs.Int64,
ProducerAppName: ingestionLog.ProducerAppName.String,
ProducerAppVersion: ingestionLog.ProducerAppVersion.String,
SdkName: ingestionLog.SdkName.String,
SdkVersion: ingestionLog.SdkVersion.String,
Entity: entity,
Error: &ingestionErr,
}

return log, nil
}

// UpdateIngestionLogStateWithError updates an ingestion log with a new state and error.
func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, ingestionError *reconcilerpb.IngestionError) error {
params := postgres.UpdateIngestionLogStateWithErrorParams{
Expand Down
29 changes: 29 additions & 0 deletions diode-server/gen/dbstore/postgres/ingestion_logs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7efa321

Please sign in to comment.