From 014e36238067f2549a86343a0207b7577736553f Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Tue, 5 Nov 2024 07:50:28 +0530 Subject: [PATCH] feat: add assume-skipped flag for first time runs --- flow/cmd/maintenance.go | 27 ++++++++++++++++++--------- flow/main.go | 24 ++++++++++++++++-------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/flow/cmd/maintenance.go b/flow/cmd/maintenance.go index 435a41b63f..c3aae5387e 100644 --- a/flow/cmd/maintenance.go +++ b/flow/cmd/maintenance.go @@ -21,14 +21,15 @@ import ( ) type MaintenanceCLIParams struct { - TemporalHostPort string - TemporalNamespace string - Mode string - FlowGrpcAddress string - FlowTlsEnabled bool - SkipOnApiVersionMatch bool - SkipOnNoMirrors bool - UserMaintenanceTaskQueue bool + TemporalHostPort string + TemporalNamespace string + Mode string + FlowGrpcAddress string + FlowTlsEnabled bool + SkipOnApiVersionMatch bool + SkipOnNoMirrors bool + UseMaintenanceTaskQueue bool + AssumeSkippedMaintenanceWorkflows bool } type StartMaintenanceResult struct { @@ -53,11 +54,19 @@ func MaintenanceMain(ctx context.Context, args *MaintenanceCLIParams) error { } taskQueueId := shared.MaintenanceFlowTaskQueue - if !args.UserMaintenanceTaskQueue { + if !args.UseMaintenanceTaskQueue { taskQueueId = shared.PeerFlowTaskQueue } if args.Mode == "start" { + if args.AssumeSkippedMaintenanceWorkflows { + slog.Info("Assuming maintenance workflows were skipped") + return WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ + Skipped: true, + SkippedReason: ptr.String("Assumed skipped by CLI Flag"), + CLIVersion: peerdbenv.PeerDBVersionShaShort(), + }) + } skipped, err := skipStartMaintenanceIfNeeded(ctx, args) if err != nil { return err diff --git a/flow/main.go b/flow/main.go index e48ed58e3a..454fb68d21 100644 --- a/flow/main.go +++ b/flow/main.go @@ -112,6 +112,12 @@ func main() { Sources: cli.EnvVars("USE_MAINTENANCE_TASK_QUEUE"), } + assumedSkippedMaintenanceWorkflowsFlag := &cli.BoolFlag{ + Name: "assume-skipped-workflow", + Value: false, + Usage: "Skip running maintenance workflows and simply output to catalog", + } + app := &cli.Command{ Name: "PeerDB Flows CLI", Commands: []*cli.Command{ @@ -203,19 +209,21 @@ func main() { flowGrpcAddressFlag, flowTlsEnabledFlag, useMaintenanceTaskQueueFlag, + assumedSkippedMaintenanceWorkflowsFlag, }, Action: func(ctx context.Context, clicmd *cli.Command) error { temporalHostPort := clicmd.String("temporal-host-port") return cmd.MaintenanceMain(ctx, &cmd.MaintenanceCLIParams{ - TemporalHostPort: temporalHostPort, - TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name), - Mode: clicmd.String(maintenanceModeWorkflowFlag.Name), - SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name), - SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name), - FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name), - FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name), - UserMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name), + TemporalHostPort: temporalHostPort, + TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name), + Mode: clicmd.String(maintenanceModeWorkflowFlag.Name), + SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name), + SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name), + FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name), + FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name), + UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name), + AssumeSkippedMaintenanceWorkflows: clicmd.Bool(assumedSkippedMaintenanceWorkflowsFlag.Name), }) }, },