From ce78585251fc287367ba45711dea8565cf48cb85 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Sat, 2 Nov 2024 02:01:01 +0530 Subject: [PATCH] fix: nit --- flow/activities/maintenance_activity.go | 23 +++++----------- flow/cmd/mirror_status.go | 15 +---------- flow/shared/worklow.go | 26 +++++++++++++++++++ flow/workflows/maintenance_flow.go | 2 +- .../migrations/V40__maintenance_flows.sql | 4 +-- protos/route.proto | 5 ++-- 6 files changed, 39 insertions(+), 36 deletions(-) create mode 100644 flow/shared/worklow.go diff --git a/flow/activities/maintenance_activity.go b/flow/activities/maintenance_activity.go index 387bb537b..c8b657b67 100644 --- a/flow/activities/maintenance_activity.go +++ b/flow/activities/maintenance_activity.go @@ -43,10 +43,10 @@ type MaintenanceMirrorInfoItem struct { func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (MaintenanceMirrorsInfo, error) { rows, err := a.CatalogPool.Query(ctx, ` - select distinct on(f.name) - f.id, f.name, f.workflow_id, - f.created_at, coalesce(f.query_string, '')='' is_cdc - from flows f + select distinct on(name) + id, name, workflow_id, + created_at, coalesce(query_string, '')='' is_cdc + from flows `) if err != nil { return MaintenanceMirrorsInfo{}, err @@ -63,18 +63,7 @@ func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (MaintenanceMir } func (a *MaintenanceActivity) getMirrorStatus(ctx context.Context, mirrorInfo MaintenanceMirrorInfoItem) (protos.FlowStatus, error) { - encodedState, err := a.TemporalClient.QueryWorkflow(ctx, mirrorInfo.WorkflowId, "", shared.FlowStatusQuery) - if err != nil { - slog.Error("Error querying mirror status for maintenance", "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err) - return protos.FlowStatus_STATUS_UNKNOWN, err - } - var state protos.FlowStatus - if err = encodedState.Get(&state); err != nil { - slog.Error("Error decoding mirror status for maintenance", "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err) - return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("error decoding mirror status for maintenance: %w", err) - } - return state, nil + return shared.GetWorkflowStatus(ctx, a.TemporalClient, mirrorInfo.WorkflowId) } func (a *MaintenanceActivity) WaitForRunningSnapshots(ctx context.Context) (MaintenanceMirrorsInfo, error) { @@ -212,7 +201,7 @@ func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirrorIn return true, nil } -func (a *MaintenanceActivity) CleanupBackupedFlows(ctx context.Context) error { +func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error { _, err := a.CatalogPool.Exec(ctx, ` update maintenance.maintenance_flows set state = $1, diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index da68b6457..5c9a8fede 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -406,20 +406,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) } func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) { - res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery) - if err != nil { - slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) - return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) - } - var state protos.FlowStatus - err = res.Get(&state) - if err != nil { - slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) - return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) - } - return state, nil + return shared.GetWorkflowStatus(ctx, h.temporalClient, workflowID) } func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context, diff --git a/flow/shared/worklow.go b/flow/shared/worklow.go new file mode 100644 index 000000000..5843abbf1 --- /dev/null +++ b/flow/shared/worklow.go @@ -0,0 +1,26 @@ +package shared + +import ( + "context" + "fmt" + "github.com/PeerDB-io/peer-flow/generated/protos" + "go.temporal.io/sdk/client" + "log/slog" +) + +func GetWorkflowStatus(ctx context.Context, temporalClient client.Client, workflowID string) (protos.FlowStatus, error) { + res, err := temporalClient.QueryWorkflow(ctx, workflowID, "", FlowStatusQuery) + if err != nil { + slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) + return protos.FlowStatus_STATUS_UNKNOWN, + fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) + } + var state protos.FlowStatus + err = res.Get(&state) + if err != nil { + slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) + return protos.FlowStatus_STATUS_UNKNOWN, + fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) + } + return state, nil +} diff --git a/flow/workflows/maintenance_flow.go b/flow/workflows/maintenance_flow.go index 8b81d0bbb..4a84a21bd 100644 --- a/flow/workflows/maintenance_flow.go +++ b/flow/workflows/maintenance_flow.go @@ -204,7 +204,7 @@ func endMaintenance(ctx workflow.Context, logger log.Logger) (*protos.EndMainten return nil, err } - clearBackupsFuture := workflow.ExecuteActivity(ctx, maintenance.CleanupBackupedFlows) + clearBackupsFuture := workflow.ExecuteActivity(ctx, maintenance.CleanBackedUpFlows) err = clearBackupsFuture.Get(ctx, nil) if err != nil { return nil, err diff --git a/nexus/catalog/migrations/V40__maintenance_flows.sql b/nexus/catalog/migrations/V40__maintenance_flows.sql index fb1ace6ad..90e61241c 100644 --- a/nexus/catalog/migrations/V40__maintenance_flows.sql +++ b/nexus/catalog/migrations/V40__maintenance_flows.sql @@ -2,8 +2,8 @@ CREATE SCHEMA IF NOT EXISTS maintenance; CREATE TABLE IF NOT EXISTS maintenance.maintenance_flows ( - id bigint PRIMARY KEY, - flow_id serial NOT NULL, + id serial PRIMARY KEY, + flow_id bigint NOT NULL, flow_name TEXT NOT NULL, workflow_id TEXT NOT NULL, flow_created_at TIMESTAMP NOT NULL, diff --git a/protos/route.proto b/protos/route.proto index 0264c7d46..1702d5274 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -437,8 +437,9 @@ message InstanceInfoResponse { } enum MaintenanceStatus { - MAINTENANCE_STATUS_START = 0; - MAINTENANCE_STATUS_END = 1; + MAINTENANCE_STATUS_UNKOWN = 0; + MAINTENANCE_STATUS_START = 1; + MAINTENANCE_STATUS_END = 2; } message MaintenanceRequest {