Skip to content

Commit

Permalink
feat: add assume-skipped flag for first time runs
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Nov 5, 2024
1 parent a066bde commit 014e362
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
27 changes: 18 additions & 9 deletions flow/cmd/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
)

type MaintenanceCLIParams struct {
TemporalHostPort string
TemporalNamespace string
Mode string
FlowGrpcAddress string
FlowTlsEnabled bool
SkipOnApiVersionMatch bool
SkipOnNoMirrors bool
UserMaintenanceTaskQueue bool
TemporalHostPort string
TemporalNamespace string
Mode string
FlowGrpcAddress string
FlowTlsEnabled bool
SkipOnApiVersionMatch bool
SkipOnNoMirrors bool
UseMaintenanceTaskQueue bool
AssumeSkippedMaintenanceWorkflows bool
}

type StartMaintenanceResult struct {
Expand All @@ -53,11 +54,19 @@ func MaintenanceMain(ctx context.Context, args *MaintenanceCLIParams) error {
}

taskQueueId := shared.MaintenanceFlowTaskQueue
if !args.UserMaintenanceTaskQueue {
if !args.UseMaintenanceTaskQueue {
taskQueueId = shared.PeerFlowTaskQueue
}

if args.Mode == "start" {
if args.AssumeSkippedMaintenanceWorkflows {
slog.Info("Assuming maintenance workflows were skipped")
return WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{
Skipped: true,
SkippedReason: ptr.String("Assumed skipped by CLI Flag"),
CLIVersion: peerdbenv.PeerDBVersionShaShort(),
})
}
skipped, err := skipStartMaintenanceIfNeeded(ctx, args)
if err != nil {
return err
Expand Down
24 changes: 16 additions & 8 deletions flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ func main() {
Sources: cli.EnvVars("USE_MAINTENANCE_TASK_QUEUE"),
}

assumedSkippedMaintenanceWorkflowsFlag := &cli.BoolFlag{
Name: "assume-skipped-workflow",
Value: false,
Usage: "Skip running maintenance workflows and simply output to catalog",
}

app := &cli.Command{
Name: "PeerDB Flows CLI",
Commands: []*cli.Command{
Expand Down Expand Up @@ -203,19 +209,21 @@ func main() {
flowGrpcAddressFlag,
flowTlsEnabledFlag,
useMaintenanceTaskQueueFlag,
assumedSkippedMaintenanceWorkflowsFlag,
},
Action: func(ctx context.Context, clicmd *cli.Command) error {
temporalHostPort := clicmd.String("temporal-host-port")

return cmd.MaintenanceMain(ctx, &cmd.MaintenanceCLIParams{
TemporalHostPort: temporalHostPort,
TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name),
Mode: clicmd.String(maintenanceModeWorkflowFlag.Name),
SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name),
SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name),
FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name),
FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name),
UserMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name),
TemporalHostPort: temporalHostPort,
TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name),
Mode: clicmd.String(maintenanceModeWorkflowFlag.Name),
SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name),
SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name),
FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name),
FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name),
UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name),
AssumeSkippedMaintenanceWorkflows: clicmd.Bool(assumedSkippedMaintenanceWorkflowsFlag.Name),
})
},
},
Expand Down

0 comments on commit 014e362

Please sign in to comment.