Skip to content

Commit

Permalink
chore: review comments pt
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Nov 1, 2024
1 parent 3a28a87 commit eafd270
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 34 deletions.
44 changes: 18 additions & 26 deletions flow/activities/maintenance_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ type MaintenanceMirrorInfoItem struct {
}

func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (MaintenanceMirrorsInfo, error) {
rows, err := a.CatalogPool.Query(ctx,
`
rows, err := a.CatalogPool.Query(ctx, `
select distinct on(f.name)
f.id, f.name, f.workflow_id,
f.created_at, coalesce(f.query_string, '')='' is_cdc
Expand All @@ -70,8 +69,7 @@ func (a *MaintenanceActivity) getMirrorStatus(ctx context.Context, mirrorInfo Ma
return protos.FlowStatus_STATUS_UNKNOWN, err
}
var state protos.FlowStatus
err = encodedState.Get(&state)
if err != nil {
if err = encodedState.Get(&state); err != nil {
slog.Error("Error decoding mirror status for maintenance", "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err)
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("error decoding mirror status for maintenance: %w", err)
Expand Down Expand Up @@ -125,22 +123,18 @@ func (a *MaintenanceActivity) checkAndWaitIfSnapshot(
}
slog.Info("Waiting for mirror to finish snapshot",
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String())
defer shared.Interval(
ctx, alertEvery, func() {
slog.Warn("[Maintenance] Still waiting for mirror to finish snapshot",
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String())
a.Alerter.LogNonFlowWarning(ctx, telemetry.MaintenanceWait, mirrorInfo.Name, fmt.Sprintf(
"Maintenance mode is still waiting for mirror to finish snapshot, mirror=%s, workflowId=%s, status=%s",
mirrorInfo.Name, mirrorInfo.WorkflowId, mirrorStatus))
},
)()

defer shared.Interval(
ctx, logEvery, func() {
slog.Info("[Maintenance] Waiting for mirror to finish snapshot",
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String())
},
)()
defer shared.Interval(ctx, alertEvery, func() {
slog.Warn("[Maintenance] Still waiting for mirror to finish snapshot",
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String())
a.Alerter.LogNonFlowWarning(ctx, telemetry.MaintenanceWait, mirrorInfo.Name, fmt.Sprintf(
"Maintenance mode is still waiting for mirror to finish snapshot, mirror=%s, workflowId=%s, status=%s",
mirrorInfo.Name, mirrorInfo.WorkflowId, mirrorStatus))
})()

defer shared.Interval(ctx, logEvery, func() {
slog.Info("[Maintenance] Waiting for mirror to finish snapshot",
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "status", mirrorStatus.String())
})()

snapshotWaitSleepInterval := 10 * time.Second
for mirrorStatus == protos.FlowStatus_STATUS_SNAPSHOT || mirrorStatus == protos.FlowStatus_STATUS_SETUP {
Expand Down Expand Up @@ -200,12 +194,10 @@ func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirrorIn
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err)
return false, err
}
defer shared.Interval(
ctx, 30*time.Second, func() {
slog.Info("Waiting for mirror to pause",
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "currentStatus", mirrorStatus.String())
},
)()
defer shared.Interval(ctx, 30*time.Second, func() {
slog.Info("Waiting for mirror to pause",
"mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "currentStatus", mirrorStatus.String())
})()

for mirrorStatus != protos.FlowStatus_STATUS_PAUSED {
time.Sleep(10 * time.Second)
Expand Down
5 changes: 1 addition & 4 deletions flow/workflows/maintenance_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,10 @@ func pauseAndGetRunningMirrors(
selector := workflow.NewSelector(ctx)
runningMirrors := make([]bool, len(mirrorsList.Mirrors))
for i, mirror := range mirrorsList.Mirrors {
activityInput := mirror
f := workflow.ExecuteActivity(
ctx,
maintenance.PauseMirrorIfRunning,
activityInput,
mirror,
)

selector.AddFuture(f, func(f workflow.Future) {
Expand All @@ -155,7 +154,6 @@ func pauseAndGetRunningMirrors(
logger.Info("Finished check and pause for mirror", "mirror", mirror, "wasRunning", wasRunning)
runningMirrors[i] = wasRunning
}
ctx.Done()
})
}
onlyRunningMirrors := make([]activities.MaintenanceMirrorInfoItem, 0)
Expand Down Expand Up @@ -250,7 +248,6 @@ func resumeBackedUpMirrors(ctx workflow.Context, logger log.Logger) (activities.
} else {
logger.Info("Finished resuming mirror", "mirror", mirror)
}
ctx.Done()
})
}

Expand Down
8 changes: 4 additions & 4 deletions nexus/catalog/migrations/V40__maintenance_flows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ CREATE SCHEMA IF NOT EXISTS maintenance;

CREATE TABLE IF NOT EXISTS maintenance.maintenance_flows
(
id SERIAL PRIMARY KEY,
id bigint PRIMARY KEY,
flow_id serial NOT NULL,
flow_name TEXT NOT NULL,
workflow_id TEXT NOT NULL,
flow_created_at TIMESTAMP NOT NULL,
is_cdc BOOLEAN NOT NULL,
state TEXT NOT NULL,
restored_at TIMESTAMP DEFAULT NULL,
from_version TEXT DEFAULT NULL,
to_version TEXT DEFAULT NULL
restored_at TIMESTAMP,
from_version TEXT,
to_version TEXT
);

0 comments on commit eafd270

Please sign in to comment.