From eafd27014921e00f770124388a03093b794af14a Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Nov 2024 20:28:45 +0530 Subject: [PATCH] chore: review comments pt --- flow/activities/maintenance_activity.go | 44 ++++++++----------- flow/workflows/maintenance_flow.go | 5 +-- .../migrations/V40__maintenance_flows.sql | 8 ++-- 3 files changed, 23 insertions(+), 34 deletions(-) diff --git a/flow/activities/maintenance_activity.go b/flow/activities/maintenance_activity.go index aaf22e6385..387bb537b6 100644 --- a/flow/activities/maintenance_activity.go +++ b/flow/activities/maintenance_activity.go @@ -42,8 +42,7 @@ type MaintenanceMirrorInfoItem struct { } func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (MaintenanceMirrorsInfo, error) { - rows, err := a.CatalogPool.Query(ctx, - ` + rows, err := a.CatalogPool.Query(ctx, ` select distinct on(f.name) f.id, f.name, f.workflow_id, f.created_at, coalesce(f.query_string, '')='' is_cdc @@ -70,8 +69,7 @@ func (a *MaintenanceActivity) getMirrorStatus(ctx context.Context, mirrorInfo Ma return protos.FlowStatus_STATUS_UNKNOWN, err } var state protos.FlowStatus - err = encodedState.Get(&state) - if err != nil { + if err = encodedState.Get(&state); err != nil { slog.Error("Error decoding mirror status for maintenance", "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err) return protos.FlowStatus_STATUS_UNKNOWN, fmt.Errorf("error decoding mirror status for maintenance: %w", err) @@ -125,22 +123,18 @@ func (a *MaintenanceActivity) checkAndWaitIfSnapshot( } slog.Info("Waiting for mirror to finish snapshot", "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String()) - defer shared.Interval( - ctx, alertEvery, func() { - slog.Warn("[Maintenance] Still waiting for mirror to finish snapshot", - "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String()) - a.Alerter.LogNonFlowWarning(ctx, telemetry.MaintenanceWait, mirrorInfo.Name, fmt.Sprintf( - "Maintenance mode is still waiting for mirror to finish snapshot, mirror=%s, workflowId=%s, status=%s", - mirrorInfo.Name, mirrorInfo.WorkflowId, mirrorStatus)) - }, - )() - - defer shared.Interval( - ctx, logEvery, func() { - slog.Info("[Maintenance] Waiting for mirror to finish snapshot", - "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String()) - }, - )() + defer shared.Interval(ctx, alertEvery, func() { + slog.Warn("[Maintenance] Still waiting for mirror to finish snapshot", + "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String()) + a.Alerter.LogNonFlowWarning(ctx, telemetry.MaintenanceWait, mirrorInfo.Name, fmt.Sprintf( + "Maintenance mode is still waiting for mirror to finish snapshot, mirror=%s, workflowId=%s, status=%s", + mirrorInfo.Name, mirrorInfo.WorkflowId, mirrorStatus)) + })() + + defer shared.Interval(ctx, logEvery, func() { + slog.Info("[Maintenance] Waiting for mirror to finish snapshot", + "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String()) + })() snapshotWaitSleepInterval := 10 * time.Second for mirrorStatus == protos.FlowStatus_STATUS_SNAPSHOT || mirrorStatus == protos.FlowStatus_STATUS_SETUP { @@ -200,12 +194,10 @@ func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirrorIn "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err) return false, err } - defer shared.Interval( - ctx, 30*time.Second, func() { - slog.Info("Waiting for mirror to pause", - "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "currentStatus", mirrorStatus.String()) - }, - )() + defer shared.Interval(ctx, 30*time.Second, func() { + slog.Info("Waiting for mirror to pause", + "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "currentStatus", mirrorStatus.String()) + })() for mirrorStatus != protos.FlowStatus_STATUS_PAUSED { time.Sleep(10 * time.Second) diff --git a/flow/workflows/maintenance_flow.go b/flow/workflows/maintenance_flow.go index da98acb0f7..8b81d0bbb6 100644 --- a/flow/workflows/maintenance_flow.go +++ b/flow/workflows/maintenance_flow.go @@ -139,11 +139,10 @@ func pauseAndGetRunningMirrors( selector := workflow.NewSelector(ctx) runningMirrors := make([]bool, len(mirrorsList.Mirrors)) for i, mirror := range mirrorsList.Mirrors { - activityInput := mirror f := workflow.ExecuteActivity( ctx, maintenance.PauseMirrorIfRunning, - activityInput, + mirror, ) selector.AddFuture(f, func(f workflow.Future) { @@ -155,7 +154,6 @@ func pauseAndGetRunningMirrors( logger.Info("Finished check and pause for mirror", "mirror", mirror, "wasRunning", wasRunning) runningMirrors[i] = wasRunning } - ctx.Done() }) } onlyRunningMirrors := make([]activities.MaintenanceMirrorInfoItem, 0) @@ -250,7 +248,6 @@ func resumeBackedUpMirrors(ctx workflow.Context, logger log.Logger) (activities. } else { logger.Info("Finished resuming mirror", "mirror", mirror) } - ctx.Done() }) } diff --git a/nexus/catalog/migrations/V40__maintenance_flows.sql b/nexus/catalog/migrations/V40__maintenance_flows.sql index fae7315151..fb1ace6ad2 100644 --- a/nexus/catalog/migrations/V40__maintenance_flows.sql +++ b/nexus/catalog/migrations/V40__maintenance_flows.sql @@ -2,14 +2,14 @@ CREATE SCHEMA IF NOT EXISTS maintenance; CREATE TABLE IF NOT EXISTS maintenance.maintenance_flows ( - id SERIAL PRIMARY KEY, + id bigint PRIMARY KEY, flow_id serial NOT NULL, flow_name TEXT NOT NULL, workflow_id TEXT NOT NULL, flow_created_at TIMESTAMP NOT NULL, is_cdc BOOLEAN NOT NULL, state TEXT NOT NULL, - restored_at TIMESTAMP DEFAULT NULL, - from_version TEXT DEFAULT NULL, - to_version TEXT DEFAULT NULL + restored_at TIMESTAMP, + from_version TEXT, + to_version TEXT );