Skip to content

Commit

Permalink
feat: add new task queue and orchestration for upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Nov 1, 2024
1 parent c44ddf8 commit 3a28a87
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 23 deletions.
8 changes: 6 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,12 @@ func (h *FlowRequestHandler) GetInstanceInfo(ctx context.Context, in *protos.Ins
}

func (h *FlowRequestHandler) Maintenance(ctx context.Context, in *protos.MaintenanceRequest) (*protos.MaintenanceResponse, error) {
taskQueueId := shared.MaintenanceFlowTaskQueue
if in.UsePeerflowTaskQueue {
taskQueueId = shared.PeerFlowTaskQueue
}
if in.Status == protos.MaintenanceStatus_MAINTENANCE_STATUS_START {
workflowRun, err := peerflow.RunStartMaintenanceWorkflow(ctx, h.temporalClient, &protos.StartMaintenanceFlowInput{})
workflowRun, err := peerflow.RunStartMaintenanceWorkflow(ctx, h.temporalClient, &protos.StartMaintenanceFlowInput{}, taskQueueId)
if err != nil {
return nil, err
}
Expand All @@ -579,7 +583,7 @@ func (h *FlowRequestHandler) Maintenance(ctx context.Context, in *protos.Mainten
}

if in.Status == protos.MaintenanceStatus_MAINTENANCE_STATUS_END {
workflowRun, err := peerflow.RunEndMaintenanceWorkflow(ctx, h.temporalClient, &protos.EndMaintenanceFlowInput{})
workflowRun, err := peerflow.RunEndMaintenanceWorkflow(ctx, h.temporalClient, &protos.EndMaintenanceFlowInput{}, taskQueueId)
if err != nil {
return nil, err
}
Expand Down
165 changes: 153 additions & 12 deletions flow/cmd/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,42 @@ package cmd

import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"

"go.temporal.io/sdk/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type MaintenanceCLIParams struct {
TemporalHostPort string
TemporalNamespace string
FlowType string
TemporalHostPort string
TemporalNamespace string
Mode string
OutputFile string
InputFile string
FlowGrpcAddress string
FlowTlsEnabled bool
SkipOnApiVersionMatch bool
SkipOnNoMirrors bool
UserMaintenanceTaskQueue bool
}

type StartMaintenanceResult struct {
APIVersion string `json:"apiVersion,omitempty"`
CLIVersion string `json:"cliVersion,omitempty"`
SkippedReason string `json:"skippedReason,omitempty"`
Skipped bool `json:"skipped,omitempty"`
}

// MaintenanceMain is the entry point for the maintenance command, requires access to Temporal client, will exit after
Expand All @@ -33,10 +54,21 @@ func MaintenanceMain(ctx context.Context, args *MaintenanceCLIParams) error {
return fmt.Errorf("unable to create Temporal client: %w", err)
}

switch args.FlowType {
case "start":
taskQueueId := shared.MaintenanceFlowTaskQueue
if !args.UserMaintenanceTaskQueue {
taskQueueId = shared.PeerFlowTaskQueue
}

if args.Mode == "start" {
skipped, err := skipStartMaintenanceIfNeeded(ctx, args)
if err != nil {
return err
}
if skipped {
return nil
}
slog.Info("Running start maintenance workflow")
workflowRun, err := peerflow.RunStartMaintenanceWorkflow(ctx, tc, &protos.StartMaintenanceFlowInput{})
workflowRun, err := peerflow.RunStartMaintenanceWorkflow(ctx, tc, &protos.StartMaintenanceFlowInput{}, taskQueueId)
if err != nil {
slog.Error("Error running start maintenance workflow", "error", err)
return err
Expand All @@ -48,10 +80,22 @@ func MaintenanceMain(ctx context.Context, args *MaintenanceCLIParams) error {
return err
}
slog.Info("Start maintenance workflow completed", "output", output)
return nil
case "end":
return WriteMaintenanceOutput(args.OutputFile, StartMaintenanceResult{
Skipped: false,
CLIVersion: peerdbenv.PeerDBVersionShaShort(),
})
} else if args.Mode == "end" {
if input, err := ReadMaintenanceInput(args.InputFile); input != nil || err != nil {
if err != nil {
return err
}
if input.Skipped {
slog.Info("Skipping end maintenance workflow as start maintenance was skipped", "reason", input.SkippedReason)
return nil
}
}
slog.Info("Running end maintenance workflow")
workflowRun, err := peerflow.RunEndMaintenanceWorkflow(ctx, tc, &protos.EndMaintenanceFlowInput{})
workflowRun, err := peerflow.RunEndMaintenanceWorkflow(ctx, tc, &protos.EndMaintenanceFlowInput{}, taskQueueId)
if err != nil {
slog.Error("Error running end maintenance workflow", "error", err)
return err
Expand All @@ -63,9 +107,106 @@ func MaintenanceMain(ctx context.Context, args *MaintenanceCLIParams) error {
return err
}
slog.Info("End maintenance workflow completed", "output", output)
default:
return fmt.Errorf("unknown flow type %s", args.FlowType)
} else {
return fmt.Errorf("unknown flow type %s", args.Mode)
}
slog.Info("Maintenance workflow completed with type", "type", args.Mode)
return nil
}

func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParams) (bool, error) {
if args.SkipOnApiVersionMatch || args.SkipOnNoMirrors {
if args.FlowGrpcAddress == "" {
return false, errors.New("flow address is required when skipping based on API")
}
slog.Info("Constructing flow client")
transportCredentials := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
})
if !args.FlowTlsEnabled {
transportCredentials = insecure.NewCredentials()
}
conn, err := grpc.NewClient(args.FlowGrpcAddress,
grpc.WithTransportCredentials(transportCredentials),
)
if err != nil {
return false, fmt.Errorf("unable to dial grpc flow server: %w", err)
}
peerFlowClient := protos.NewFlowServiceClient(conn)
if args.SkipOnApiVersionMatch {
slog.Info("Checking if CLI version matches API version", "cliVersion", peerdbenv.PeerDBVersionShaShort())
version, err := peerFlowClient.GetVersion(ctx, &protos.PeerDBVersionRequest{})
if err != nil {
return false, err
}
slog.Info("Got version from flow", "version", version.Version)
if version.Version == peerdbenv.PeerDBVersionShaShort() {
slog.Info("Skipping maintenance workflow due to matching versions")
return true, WriteMaintenanceOutput(args.OutputFile, StartMaintenanceResult{
Skipped: true,
SkippedReason: fmt.Sprintf("CLI version %s matches API version %s", peerdbenv.PeerDBVersionShaShort(), version.Version),
APIVersion: version.Version,
CLIVersion: peerdbenv.PeerDBVersionShaShort(),
})
}
}
if args.SkipOnNoMirrors {
slog.Info("Checking if there are any mirrors")
mirrors, err := peerFlowClient.ListMirrors(ctx, &protos.ListMirrorsRequest{})
if err != nil {
return false, err
}
slog.Info("Got mirrors from flow", "mirrors", mirrors.Mirrors)
if len(mirrors.Mirrors) == 0 {
slog.Info("Skipping maintenance workflow due to no mirrors")
return true, WriteMaintenanceOutput(args.OutputFile, StartMaintenanceResult{
Skipped: true,
SkippedReason: "No mirrors found",
})
}
}
}
return false, nil
}

func WriteMaintenanceOutput(outputFile string, result StartMaintenanceResult) error {
if outputFile == "" {
return nil
}
slog.Info("Maintenance workflow completed with type", "type", args.FlowType)
slog.Info("Writing maintenance result to file", "file", outputFile, "result", result)
f, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("unable to create output file: %w", err)
}
defer f.Close()
// marshall to json
marshalledResult, err := json.Marshal(result)
if err != nil {
return err
}
_, err = f.Write(marshalledResult)
if err != nil {
return fmt.Errorf("unable to write to output file: %w", err)
}

return nil
}

func ReadMaintenanceInput(inputFile string) (*StartMaintenanceResult, error) {
if inputFile == "" {
return nil, nil
}
slog.Info("Reading maintenance input from file", "file", inputFile)
f, err := os.Open(inputFile)
if err != nil {
return nil, fmt.Errorf("unable to open input file: %w", err)
}
defer f.Close()
var params StartMaintenanceResult
decoder := json.NewDecoder(f)
err = decoder.Decode(&params)
if err != nil {
return nil, fmt.Errorf("unable to decode input file: %w", err)
}
return &params, nil
}
8 changes: 6 additions & 2 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type WorkerSetupOptions struct {
TemporalMaxConcurrentWorkflowTasks int
EnableProfiling bool
EnableOtelMetrics bool
UseMaintenanceTaskQueue bool
}

type workerSetupResponse struct {
Expand Down Expand Up @@ -124,8 +125,11 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
return nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
slog.Info("Created temporal client")

taskQueue := peerdbenv.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
queueId := shared.PeerFlowTaskQueue
if opts.UseMaintenanceTaskQueue {
queueId = shared.MaintenanceFlowTaskQueue
}
taskQueue := peerdbenv.PeerFlowTaskQueueName(queueId)
slog.Info(
fmt.Sprintf("Creating temporal worker for queue %v: %v workflow workers %v activity workers",
taskQueue,
Expand Down
71 changes: 68 additions & 3 deletions flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,55 @@ func main() {
Sources: cli.EnvVars("RUN_MAINTENANCE_FLOW"),
}

maintenanceSkipOnApiVersionMatchFlag := &cli.BoolFlag{
Name: "skip-on-api-version-match",
Value: false,
Usage: "Skip maintenance flow if the API version matches",
Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_API_VERSION_MATCH"),
}

maintenanceSkipOnNoMirrorsFlag := &cli.BoolFlag{
Name: "skip-on-no-mirrors",
Value: false,
Usage: "Skip maintenance flow if there are no mirrors",
Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_NO_MIRRORS"),
}

flowGrpcAddressFlag := &cli.StringFlag{
Name: "flow-grpc-address",
Value: "",
Usage: "Address of the flow gRPC server",
Sources: cli.EnvVars("FLOW_GRPC_ADDRESS"),
}

flowTlsEnabledFlag := &cli.BoolFlag{
Name: "flow-tls-enabled",
Value: false,
Usage: "Enable TLS for the flow gRPC server",
Sources: cli.EnvVars("FLOW_TLS_ENABLED"),
}

maintenanceOutputFileFlag := &cli.StringFlag{
Name: "output-file",
Value: "",
Usage: "Output file for maintenance flow, this is used for `start` maintenance flow",
Sources: cli.EnvVars("MAINTENANCE_OUTPUT_FILE"),
}

mainenanceInputFileFlag := &cli.StringFlag{
Name: "input-file",
Value: "",
Usage: "Input file for maintenance flow, this is used for `end` maintenance flow",
Sources: cli.EnvVars("MAINTENANCE_INPUT_FILE"),
}

useMaintenanceTaskQueueFlag := &cli.BoolFlag{
Name: "use-maintenance-task-queue",
Value: false,
Usage: "Use the maintenance task queue for the worker",
Sources: cli.EnvVars("USE_MAINTENANCE_TASK_QUEUE"),
}

app := &cli.Command{
Name: "PeerDB Flows CLI",
Commands: []*cli.Command{
Expand All @@ -92,6 +141,7 @@ func main() {
TemporalNamespace: clicmd.String("temporal-namespace"),
TemporalMaxConcurrentActivities: int(clicmd.Int("temporal-max-concurrent-activities")),
TemporalMaxConcurrentWorkflowTasks: int(clicmd.Int("temporal-max-concurrent-workflow-tasks")),
UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name),
})
if err != nil {
return err
Expand All @@ -107,6 +157,7 @@ func main() {
temporalNamespaceFlag,
temporalMaxConcurrentActivitiesFlag,
temporalMaxConcurrentWorkflowTasksFlag,
useMaintenanceTaskQueueFlag,
},
},
{
Expand Down Expand Up @@ -161,14 +212,28 @@ func main() {
temporalHostPortFlag,
temporalNamespaceFlag,
maintenanceModeWorkflowFlag,
maintenanceSkipOnApiVersionMatchFlag,
maintenanceSkipOnNoMirrorsFlag,
flowGrpcAddressFlag,
flowTlsEnabledFlag,
maintenanceOutputFileFlag,
mainenanceInputFileFlag,
useMaintenanceTaskQueueFlag,
},
Action: func(ctx context.Context, clicmd *cli.Command) error {
temporalHostPort := clicmd.String("temporal-host-port")

return cmd.MaintenanceMain(ctx, &cmd.MaintenanceCLIParams{
TemporalHostPort: temporalHostPort,
TemporalNamespace: clicmd.String("temporal-namespace"),
FlowType: clicmd.String("run-maintenance-flow"),
TemporalHostPort: temporalHostPort,
TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name),
Mode: clicmd.String(maintenanceModeWorkflowFlag.Name),
SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name),
SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name),
FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name),
FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name),
OutputFile: clicmd.String(maintenanceOutputFileFlag.Name),
InputFile: clicmd.String(mainenanceInputFileFlag.Name),
UserMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name),
})
},
},
Expand Down
5 changes: 3 additions & 2 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type (

const (
// Task Queues
PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue"
SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue"
PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue"
SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue"
MaintenanceFlowTaskQueue TaskQueueID = "maintenance-flow-task-queue"

// Queries
CDCFlowStateQuery = "q-cdc-flow-state"
Expand Down
6 changes: 4 additions & 2 deletions flow/workflows/maintenance_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ func RunStartMaintenanceWorkflow(
ctx context.Context,
temporalClient client.Client,
input *protos.StartMaintenanceFlowInput,
taskQueueId shared.TaskQueueID,
) (client.WorkflowRun, error) {
startWorkflowOptions := client.StartWorkflowOptions{
// This is to ensure that maintenance workflows are deduped
WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
TaskQueue: peerdbenv.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue),
TaskQueue: peerdbenv.PeerFlowTaskQueueName(taskQueueId),
}
startWorkflowOptions.ID = "start-maintenance"
if deploymentUid := peerdbenv.PeerDBDeploymentUID(); deploymentUid != "" {
Expand All @@ -44,12 +45,13 @@ func RunEndMaintenanceWorkflow(
ctx context.Context,
temporalClient client.Client,
input *protos.EndMaintenanceFlowInput,
taskQueueId shared.TaskQueueID,
) (client.WorkflowRun, error) {
startWorkflowOptions := client.StartWorkflowOptions{
// This is to ensure that maintenance workflows are deduped
WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
TaskQueue: peerdbenv.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue),
TaskQueue: peerdbenv.PeerFlowTaskQueueName(taskQueueId),
}
startWorkflowOptions.ID = "end-maintenance"
if deploymentUid := peerdbenv.PeerDBDeploymentUID(); deploymentUid != "" {
Expand Down
Loading

0 comments on commit 3a28a87

Please sign in to comment.