From 48eb0c100d18bbb36ce251827ed3e77c6ab97c35 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 1 Jun 2023 15:05:14 -0700 Subject: [PATCH 1/8] untested, event printing mostly in Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 28 ++++++++++ go.mod | 1 + pkg/common/flyte_url.go | 9 ++++ pkg/manager/impl/execution_manager.go | 60 ++++++++++++++++++++++ pkg/manager/impl/task_execution_manager.go | 55 ++++++++++++++++++++ 5 files changed, 153 insertions(+) diff --git a/dataproxy/service.go b/dataproxy/service.go index a62d37b43..604152bb1 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -5,6 +5,7 @@ import ( "encoding/base32" "encoding/base64" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "net/url" "reflect" "strconv" @@ -95,10 +96,37 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create a signed url. Error: %v", err) } + // The artifact returned here has no relevant entry in the admin database, so call the artifact service synchronously + // to persist the information. + artifactCreate := &artifact.CreateArtifactRequest{ + ArtifactKey: &core.ArtifactKey{ + Project: req.Project, + Domain: req.Domain, + }, + Version: string(req.ContentMd5), + Uri: "", + Spec: req.GetArtifactSpec(), + } + fmt.Printf("Will call artifact service with request: %v\n", artifactCreate) + // artifact := artifact_service.CreateArtifact + return &service.CreateUploadLocationResponse{ SignedUrl: resp.URL.String(), NativeUrl: storagePath.String(), ExpiresAt: timestamppb.New(time.Now().Add(req.ExpiresIn.AsDuration())), + // replace with created artifact + Artifact: &artifact.Artifact{ + ArtifactId: &core.ArtifactID{ + ArtifactKey: &core.ArtifactKey{ + Project: req.Project, + Domain: req.Domain, + Name: fmt.Sprintf("autogenerated.%s", req.Filename), + }, + Version: string(req.ContentMd5), + }, + Uri: "flyte://random/returned/url", + Spec: req.GetArtifactSpec(), + }, }, nil } diff --git a/go.mod b/go.mod index 46ef4b0de..77b035563 100644 --- a/go.mod +++ b/go.mod @@ -209,6 +209,7 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a +replace github.com/flyteorg/flyteidl => ../flyteidl // Retracted versions // This was published in error when attempting to create 1.5.1 Flyte release. diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index a78689f36..1b87d05f3 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -152,3 +152,12 @@ func FlyteURLsFromTaskExecutionID(taskExecutionID core.TaskExecutionIdentifier, } return res } + +// AppendLinksForLiteralMap returns a map of URLs for each output in the supplied literal map. +func AppendLinksForLiteralMap(baseOutputURL string, literalMap core.LiteralMap) map[string]string { + res := make(map[string]string, len(literalMap.Literals)) + for k, _ := range literalMap.Literals { + res[k] = fmt.Sprintf("%s/%s", baseOutputURL, k) + } + return res +} diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 8a6b7287b..b5bc4a8cf 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -3,6 +3,8 @@ package impl import ( "context" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "strconv" "time" @@ -1195,6 +1197,63 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime( watch.Observe(*executionModel.ExecutionCreatedAt, terminalEventTime) } +func (m *ExecutionManager) tempHandleArtifactEventEmitting(ctx context.Context, request admin.WorkflowExecutionEventRequest) { + // Print out what the catalog service will eventually do. Can all this be retrieved just from the raw event? No. + // Missing: Artifact name and other info declared in the wf decorator. + var outputs *core.LiteralMap + var err error + if request.Event.GetOutputData() != nil { + fmt.Printf("Got output data") + outputs = request.Event.GetOutputData() + } else if len(request.Event.GetOutputUri()) > 0 { + fmt.Printf("Got output URI") + // GetInputs actually fetches the data, even though this is an output + outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), + m.storageClient, request.Event.GetOutputUri()) + if err != nil { + fmt.Printf("Error fetching output literal map %v", request.Event) + } + } else { + fmt.Printf("No output data found for %v\n", request.Event) + } + if outputs != nil { + nodeExecutionID := core.NodeExecutionIdentifier{ + NodeId: "end-node", + ExecutionId: request.Event.ExecutionId, + } + urls := common.FlyteURLsFromNodeExecutionID(nodeExecutionID, false) + outputURLs := common.AppendLinksForLiteralMap(urls.GetInputs(), *outputs) + + for k, v := range outputs.Literals { + as := artifact.ArtifactSpec{ + Value: v, + // Type, tags and aliases will need to be filled in later + Source: &artifact.ArtifactSpec_Execution{ + Execution: request.Event.ExecutionId, + }, + } + ak := core.ArtifactKey{ + Project: request.Event.ExecutionId.Project, + Domain: request.Event.ExecutionId.Domain, + // This will need to be filled in later, will need to pull from task template, or set to + // something pretty unique, like the task ID. + Name: "", + } + + a := artifact.CreateArtifactRequest{ + ArtifactKey: &ak, + Version: request.Event.ExecutionId.Name, + Uri: outputURLs[k], + Spec: &as, + } + e := event.ArtifactCreateEvent{ + CreateRequest: &a, + } + print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) + } + } +} + func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admin.WorkflowExecutionEventRequest) ( *admin.WorkflowExecutionEventResponse, error) { err := validation.ValidateCreateWorkflowEventRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes) @@ -1280,6 +1339,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi if request.Event.GetOutputData() != nil { m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData()))) } + m.tempHandleArtifactEventEmitting(ctx, request) err = m.publishNotifications(ctx, request, *executionModel) if err != nil { diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index 46967f264..832cee54c 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -3,6 +3,7 @@ package impl import ( "context" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "strconv" cloudeventInterfaces "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces" @@ -31,6 +32,7 @@ import ( runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flytestdlib/logger" "google.golang.org/grpc/codes" ) @@ -129,6 +131,58 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState( return *existingTaskExecution, nil } +func (m *TaskExecutionManager) tempHandleArtifactEventEmitting(ctx context.Context, request admin.TaskExecutionEventRequest, taskExecutionID core.TaskExecutionIdentifier) { + // Print out what the catalog service will eventually do. Can all this be retrieved just from the raw event? No. + // Missing is: tags/aliases/given name + var outputs *core.LiteralMap + var err error + if request.Event.GetOutputData() != nil { + fmt.Printf("Got output data") + outputs = request.Event.GetOutputData() + } else if len(request.Event.GetOutputUri()) > 0 { + fmt.Printf("Got output URI") + // GetInputs actually fetches the data, even though this is an output + outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), + m.storageClient, request.Event.GetOutputUri()) + if err != nil { + fmt.Printf("Error fetching output literal map %v", request.Event) + } + } else { + fmt.Printf("No output data found for %v\n", request.Event) + } + if outputs != nil { + urls := common.FlyteURLsFromTaskExecutionID(taskExecutionID, false) + outputURLs := common.AppendLinksForLiteralMap(urls.GetOutputs(), *outputs) + for k, v := range outputs.Literals { + as := artifact.ArtifactSpec{ + Value: v, + // Type, tags and aliases will need to be filled in later + Source: &artifact.ArtifactSpec_TaskExecution{ + TaskExecution: &taskExecutionID, + }, + } + ak := core.ArtifactKey{ + Project: request.Event.ParentNodeExecutionId.ExecutionId.Project, + Domain: request.Event.ParentNodeExecutionId.ExecutionId.Domain, + // This will need to be filled in later, will need to pull from task template, or set to + // something pretty unique, like the task ID. + Name: "", + } + + a := artifact.CreateArtifactRequest{ + ArtifactKey: &ak, + Version: request.Event.ParentNodeExecutionId.ExecutionId.Name, + Uri: outputURLs[k], + Spec: &as, + } + e := event.ArtifactCreateEvent{ + CreateRequest: &a, + } + print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) + } + } +} + func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) ( *admin.TaskExecutionEventResponse, error) { if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil { @@ -202,6 +256,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req if request.Event.GetOutputData() != nil { m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData()))) } + m.tempHandleArtifactEventEmitting(ctx, request, taskExecutionID) } if err = m.notificationClient.Publish(ctx, proto.MessageName(&request), &request); err != nil { From 8b6ca021385f4ffb24fe36241cba46c7aaeb22fa Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 2 Jun 2023 09:32:25 -0700 Subject: [PATCH 2/8] added printing of usage and saving in execution spec metadata Signed-off-by: Yee Hing Tong --- pkg/manager/impl/execution_manager.go | 110 +++++++++++------- .../impl/validation/execution_validator.go | 33 ++++-- 2 files changed, 93 insertions(+), 50 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index b5bc4a8cf..5a31d7d5b 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -452,7 +452,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad func (m *ExecutionManager) launchSingleTaskExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, error) { + context.Context, *models.Execution, map[string]*core.ArtifactID, error) { taskModel, err := m.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{ Project: request.Spec.LaunchPlan.Project, @@ -461,11 +461,11 @@ func (m *ExecutionManager) launchSingleTaskExecution( Version: request.Spec.LaunchPlan.Version, }) if err != nil { - return nil, nil, err + return nil, nil, nil, err } task, err := transformers.FromTaskModel(taskModel) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Prepare a skeleton workflow @@ -474,15 +474,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task) if err != nil { logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err) - return nil, nil, err + return nil, nil, nil, err } workflow, err := transformers.FromWorkflowModel(*workflowModel) if err != nil { - return nil, nil, err + return nil, nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { - return nil, nil, err + return nil, nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -490,10 +490,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier, workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - executionInputs, err := validation.CheckAndFetchInputsForExecution( + executionInputs, resolvedArtifactMap, err := validation.CheckAndFetchInputsForExecution( request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, @@ -502,7 +502,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, err + return nil, nil, nil, err } name := util.GetExecutionName(request) @@ -520,13 +520,14 @@ func (m *ExecutionManager) launchSingleTaskExecution( requestSpec.Metadata = &admin.ExecutionMetadata{} } requestSpec.Metadata.Principal = getUser(ctx) + requestSpec.Metadata.ArtifactIds = resolvedArtifactMap // Get the node execution (if any) that launched this execution var parentNodeExecutionID uint var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Dynamically assign task resource defaults. @@ -540,15 +541,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var labels map[string]string @@ -558,7 +559,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var annotations map[string]string @@ -573,7 +574,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -591,7 +592,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "") if err != nil { - return nil, nil, err + return nil, nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -616,7 +617,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, request.Inputs, err) - return nil, nil, err + return nil, nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -657,10 +658,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, err + return nil, nil, nil, err } m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(proto.Size(request.Inputs))) - return ctx, executionModel, nil + return ctx, executionModel, resolvedArtifactMap, nil } func resolveAuthRole(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole { @@ -710,11 +711,17 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se func (m *ExecutionManager) launchExecutionAndPrepareModel( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, error) { + context.Context, *models.Execution, map[string]*core.ArtifactID, error) { + + // Resolve artifacts. + // two sources of artifacts: launch plan and create execute request. + // - within the launch plan, the artifact will be in the Parameter map, and can come in the Literal, + // or as an ArtifactQuery. + err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration()) if err != nil { logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err) - return nil, nil, err + return nil, nil, nil, err } if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK { logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan) @@ -724,14 +731,16 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan) if err != nil { logger.Debugf(ctx, "Failed to get launch plan model for ExecutionCreateRequest %+v with err %v", request, err) - return nil, nil, err + return nil, nil, nil, err } launchPlan, err := transformers.FromLaunchPlanModel(launchPlanModel) if err != nil { logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err) - return nil, nil, err + return nil, nil, nil, err } - executionInputs, err := validation.CheckAndFetchInputsForExecution( + // Artifacts retrieved will need to be stored somewhere to ensure that we can re-emit events if necessary + // in the future, and also to make sure that relaunch and recover can use it if necessary. + executionInputs, resolvedArtifactMap, err := validation.CheckAndFetchInputsForExecution( request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, @@ -741,23 +750,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, err + return nil, nil, nil, err } workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, err + return nil, nil, nil, err } workflow, err := transformers.FromWorkflowModel(workflowModel) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, err + return nil, nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, err + return nil, nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -774,13 +783,14 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( requestSpec.Metadata = &admin.ExecutionMetadata{} } requestSpec.Metadata.Principal = getUser(ctx) + requestSpec.Metadata.ArtifactIds = resolvedArtifactMap // Get the node and parent execution (if any) that launched this execution var parentNodeExecutionID uint var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Dynamically assign task resource defaults. @@ -794,16 +804,16 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, executionInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan) if err != nil { - return nil, nil, err + return nil, nil, nil, err } namespace := common.GetNamespaceName( @@ -811,15 +821,15 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, err + return nil, nil, nil, err } annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var rawOutputDataConfig *admin.RawOutputDataConfig if executionConfig.RawOutputDataConfig != nil { @@ -828,7 +838,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -846,7 +856,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -872,7 +882,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, executionInputs, err) - return nil, nil, err + return nil, nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -913,9 +923,9 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, err + return nil, nil, nil, err } - return ctx, executionModel, nil + return ctx, executionModel, resolvedArtifactMap, nil } // Inserts an execution model into the database store and emits platform metrics. @@ -939,6 +949,17 @@ func (m *ExecutionManager) createExecutionModel( return &workflowExecutionIdentifier, nil } +func (m *ExecutionManager) handleArtifactEvents(artifactIDs map[string]*core.ArtifactID, wfExecID core.WorkflowExecutionIdentifier) { + if artifactIDs != nil { + fmt.Printf("WF exec used %v", wfExecID.String()) + for _, artifactID := range artifactIDs { + if artifactID != nil { + fmt.Printf("artifactID: %v\n", artifactID) + } + } + } +} + func (m *ExecutionManager) CreateExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( *admin.ExecutionCreateResponse, error) { @@ -948,7 +969,8 @@ func (m *ExecutionManager) CreateExecution( } var executionModel *models.Execution var err error - ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) + var artifactIDs map[string]*core.ArtifactID + ctx, executionModel, artifactIDs, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) if err != nil { return nil, err } @@ -956,6 +978,7 @@ func (m *ExecutionManager) CreateExecution( if err != nil { return nil, err } + m.handleArtifactEvents(artifactIDs, *workflowExecutionIdentifier) return &admin.ExecutionCreateResponse{ Id: workflowExecutionIdentifier, }, nil @@ -997,7 +1020,7 @@ func (m *ExecutionManager) RelaunchExecution( executionSpec.Metadata.ReferenceExecution = existingExecution.Id executionSpec.OverwriteCache = request.GetOverwriteCache() var executionModel *models.Execution - ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ + ctx, executionModel, artifactIDs, err := m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ Project: request.Id.Project, Domain: request.Id.Domain, Name: request.Name, @@ -1012,6 +1035,7 @@ func (m *ExecutionManager) RelaunchExecution( if err != nil { return nil, err } + m.handleArtifactEvents(artifactIDs, *workflowExecutionIdentifier) logger.Debugf(ctx, "Successfully relaunched [%+v] as [%+v]", request.Id, workflowExecutionIdentifier) return &admin.ExecutionCreateResponse{ Id: workflowExecutionIdentifier, @@ -1048,7 +1072,7 @@ func (m *ExecutionManager) RecoverExecution( executionSpec.Metadata.Mode = admin.ExecutionMetadata_RECOVERED executionSpec.Metadata.ReferenceExecution = existingExecution.Id var executionModel *models.Execution - ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ + ctx, executionModel, _, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ Project: request.Id.Project, Domain: request.Id.Domain, Name: request.Name, diff --git a/pkg/manager/impl/validation/execution_validator.go b/pkg/manager/impl/validation/execution_validator.go index 73c31e1cc..6920f44e4 100644 --- a/pkg/manager/impl/validation/execution_validator.go +++ b/pkg/manager/impl/validation/execution_validator.go @@ -76,11 +76,14 @@ func ValidateExecutionRequest(ctx context.Context, request admin.ExecutionCreate return nil } +// CheckAndFetchInputsForExecution will merge inputs and also resolve any artifacts that are required. +// A map will be returned for all artifacts used. func CheckAndFetchInputsForExecution( - userInputs *core.LiteralMap, fixedInputs *core.LiteralMap, expectedInputs *core.ParameterMap) (*core.LiteralMap, error) { + userInputs *core.LiteralMap, fixedInputs *core.LiteralMap, expectedInputs *core.ParameterMap) (*core.LiteralMap, map[string]*core.ArtifactID, error) { executionInputMap := map[string]*core.Literal{} expectedInputMap := map[string]*core.Parameter{} + resolvedArtifactMap := map[string]*core.ArtifactID{} if expectedInputs != nil && len(expectedInputs.GetParameters()) > 0 { expectedInputMap = expectedInputs.GetParameters() @@ -89,7 +92,7 @@ func CheckAndFetchInputsForExecution( if userInputs != nil && len(userInputs.GetLiterals()) > 0 { for name, value := range userInputs.GetLiterals() { if _, ok := expectedInputMap[name]; !ok { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input %s", name) + return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input %s", name) } executionInputMap[name] = value } @@ -98,13 +101,20 @@ func CheckAndFetchInputsForExecution( for name, expectedInput := range expectedInputMap { if _, ok := executionInputMap[name]; !ok { if expectedInput.GetRequired() { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s missing", shared.ExpectedInputs, name) + return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s missing", shared.ExpectedInputs, name) + } + // Look up from Artifact service if necessary + if expectedInput.GetArtifactQuery() != nil { + // should be executionInputMap[name] = artifactService.query(expectedInput.GetArtifactQuery()) + executionInputMap[name] = expectedInput.GetDefault() + } else { + // At this point, the Literal returned by GetDefault might still be an ArtifactID + executionInputMap[name] = expectedInput.GetDefault() } - executionInputMap[name] = expectedInput.GetDefault() } else { inputType := validators.LiteralTypeForLiteral(executionInputMap[name]) if !validators.AreTypesCastable(inputType, expectedInput.GetVar().GetType()) { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid %s input wrong type. Expected %s, but got %s", name, expectedInput.GetVar().GetType(), inputType) + return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid %s input wrong type. Expected %s, but got %s", name, expectedInput.GetVar().GetType(), inputType) } } } @@ -112,15 +122,24 @@ func CheckAndFetchInputsForExecution( if fixedInputs != nil && len(fixedInputs.GetLiterals()) > 0 { for name, fixedInput := range fixedInputs.GetLiterals() { if _, ok := executionInputMap[name]; ok { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s cannot be overridden", shared.FixedInputs, name) + return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s cannot be overridden", shared.FixedInputs, name) } executionInputMap[name] = fixedInput } } + // Resolve any artifacts that are required. + for name, input := range executionInputMap { + if input.GetArtifactId() != nil { + resolvedArtifactMap[name] = input.GetArtifactId() + // Replace the reference with the actual literal + // executionInputMap[name] = artifactService.GetArtifact(input.GetArtifactId()) + } + } + return &core.LiteralMap{ Literals: executionInputMap, - }, nil + }, resolvedArtifactMap, nil } func CheckValidExecutionID(executionID, fieldName string) error { From f1b0d368223bfd55ff10735d5d9a8add5f5d207a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 19 Jul 2023 14:32:43 -0700 Subject: [PATCH 3/8] compiling Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 9 +- flyteadmin_config.yaml | 4 + go.mod | 7 +- go.sum | 6 + pkg/async/cloudevent/factory.go | 61 +++-- .../implementations/cloudevent_publisher.go | 233 +++++++++++++++++- pkg/async/cloudevent/redis/redis_publisher.go | 62 +++++ .../cloudevent/redis/redis_publisher_test.go | 33 +++ pkg/common/cloud.go | 1 + pkg/common/flyte_url.go | 7 + pkg/manager/impl/execution_manager.go | 184 +++++++++----- pkg/manager/impl/task_execution_manager.go | 121 +++++---- pkg/rpc/adminservice/base.go | 3 +- .../interfaces/application_configuration.go | 20 ++ 14 files changed, 605 insertions(+), 146 deletions(-) create mode 100644 pkg/async/cloudevent/redis/redis_publisher.go create mode 100644 pkg/async/cloudevent/redis/redis_publisher_test.go diff --git a/dataproxy/service.go b/dataproxy/service.go index 604152bb1..7e35cdcaa 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -5,13 +5,14 @@ import ( "encoding/base32" "encoding/base64" "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "net/url" "reflect" "strconv" "strings" "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyteadmin/pkg/common" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -103,9 +104,7 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp Project: req.Project, Domain: req.Domain, }, - Version: string(req.ContentMd5), - Uri: "", - Spec: req.GetArtifactSpec(), + Spec: req.GetArtifactSpec(), } fmt.Printf("Will call artifact service with request: %v\n", artifactCreate) // artifact := artifact_service.CreateArtifact @@ -120,9 +119,7 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp ArtifactKey: &core.ArtifactKey{ Project: req.Project, Domain: req.Domain, - Name: fmt.Sprintf("autogenerated.%s", req.Filename), }, - Version: string(req.ContentMd5), }, Uri: "flyte://random/returned/url", Spec: req.GetArtifactSpec(), diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index e3d19f732..0632f0045 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -112,6 +112,10 @@ externalEvents: eventsPublisher: topicName: "bar" eventTypes: all +cloudEvents: + type: redis + redis: + addr: "localhost:6379" Logger: show-source: true level: 5 diff --git a/go.mod b/go.mod index 77b035563..3efb908c4 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.5.7 + github.com/flyteorg/flyteidl v1.5.8-0.20230714175257-ad36544662c7 github.com/flyteorg/flyteplugins v1.0.56 github.com/flyteorg/flytepropeller v1.1.87 github.com/flyteorg/flytestdlib v1.0.15 @@ -85,11 +85,12 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/coocood/freecache v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0 // indirect github.com/dgraph-io/ristretto v0.0.3 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect @@ -158,6 +159,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect + github.com/redis/go-redis/v9 v9.0.5 // indirect github.com/sendgrid/rest v2.6.8+incompatible // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/afero v1.8.2 // indirect @@ -209,6 +211,7 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a + replace github.com/flyteorg/flyteidl => ../flyteidl // Retracted versions diff --git a/go.sum b/go.sum index 255fa8893..4cc33e103 100644 --- a/go.sum +++ b/go.sum @@ -191,6 +191,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -264,6 +266,8 @@ github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70d github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= @@ -1306,6 +1310,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= +github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rhnvrm/simples3 v0.5.0/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/pkg/async/cloudevent/factory.go b/pkg/async/cloudevent/factory.go index a43d8749b..728e92e3a 100644 --- a/pkg/async/cloudevent/factory.go +++ b/pkg/async/cloudevent/factory.go @@ -4,6 +4,10 @@ import ( "context" "time" + dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces" + repositoryInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces" + "github.com/flyteorg/flytestdlib/storage" + "github.com/NYTimes/gizmo/pubsub" gizmoAWS "github.com/NYTimes/gizmo/pubsub/aws" gizmoGCP "github.com/NYTimes/gizmo/pubsub/gcp" @@ -13,6 +17,7 @@ import ( "github.com/flyteorg/flyteadmin/pkg/async" cloudEventImplementations "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/implementations" "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces" + redisPublisher "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/redis" "github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations" "github.com/flyteorg/flyteadmin/pkg/common" runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" @@ -20,18 +25,20 @@ import ( "github.com/flyteorg/flytestdlib/promutils" ) -func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.CloudEventsConfig, scope promutils.Scope) interfaces.Publisher { - if !config.Enable { +func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Repository, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, cloudEventsConfig runtimeInterfaces.CloudEventsConfig, remoteDataConfig runtimeInterfaces.RemoteDataConfig, scope promutils.Scope) interfaces.Publisher { + if !cloudEventsConfig.Enable { return implementations.NewNoopPublish() } - reconnectAttempts := config.ReconnectAttempts - reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second - switch config.Type { + reconnectAttempts := cloudEventsConfig.ReconnectAttempts + reconnectDelay := time.Duration(cloudEventsConfig.ReconnectDelaySeconds) * time.Second + + var sender interfaces.Sender + switch cloudEventsConfig.Type { case common.AWS: snsConfig := gizmoAWS.SNSConfig{ - Topic: config.EventsPublisherConfig.TopicName, + Topic: cloudEventsConfig.EventsPublisherConfig.TopicName, } - snsConfig.Region = config.AWSConfig.Region + snsConfig.Region = cloudEventsConfig.AWSConfig.Region var publisher pubsub.Publisher var err error @@ -44,12 +51,13 @@ func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.Cloud if err != nil { panic(err) } - return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.PubSubSender{Pub: publisher}, scope, config.EventsPublisherConfig.EventTypes) + sender = &cloudEventImplementations.PubSubSender{Pub: publisher} + case common.GCP: pubsubConfig := gizmoGCP.Config{ - Topic: config.EventsPublisherConfig.TopicName, + Topic: cloudEventsConfig.EventsPublisherConfig.TopicName, } - pubsubConfig.ProjectID = config.GCPConfig.ProjectID + pubsubConfig.ProjectID = cloudEventsConfig.GCPConfig.ProjectID var publisher pubsub.MultiPublisher var err error err = async.Retry(reconnectAttempts, reconnectDelay, func() error { @@ -60,30 +68,51 @@ func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.Cloud if err != nil { panic(err) } - return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.PubSubSender{Pub: publisher}, scope, config.EventsPublisherConfig.EventTypes) + sender = &cloudEventImplementations.PubSubSender{Pub: publisher} + case cloudEventImplementations.Kafka: saramaConfig := sarama.NewConfig() var err error - saramaConfig.Version, err = sarama.ParseKafkaVersion(config.KafkaConfig.Version) + saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version) if err != nil { logger.Fatalf(ctx, "failed to parse kafka version, %v", err) panic(err) } - sender, err := kafka_sarama.NewSender(config.KafkaConfig.Brokers, saramaConfig, config.EventsPublisherConfig.TopicName) + kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName) if err != nil { panic(err) } - client, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + client, err := cloudevents.NewClient(kafkaSender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { logger.Fatalf(ctx, "failed to create kafka client, %v", err) panic(err) } - return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.KafkaSender{Client: client}, scope, config.EventsPublisherConfig.EventTypes) + sender = &cloudEventImplementations.KafkaSender{Client: client} + + case common.Redis: + var publisher pubsub.Publisher + var err error + err = async.Retry(reconnectAttempts, reconnectDelay, func() error { + publisher, err = redisPublisher.NewPublisher(cloudEventsConfig.RedisConfig) + return err + }) + + // Persistent errors should hard fail + if err != nil { + panic(err) + } + sender = &cloudEventImplementations.PubSubSender{Pub: publisher} + case common.Local: fallthrough default: logger.Infof(ctx, - "Using default noop cloud events publisher implementation for config type [%s]", config.Type) + "Using default noop cloud events publisher implementation for config type [%s]", cloudEventsConfig.Type) return implementations.NewNoopPublish() } + + if !cloudEventsConfig.TransformToCloudEvents { + return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes) + } + return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig) } diff --git a/pkg/async/cloudevent/implementations/cloudevent_publisher.go b/pkg/async/cloudevent/implementations/cloudevent_publisher.go index 53dba62d4..fb3bb603e 100644 --- a/pkg/async/cloudevent/implementations/cloudevent_publisher.go +++ b/pkg/async/cloudevent/implementations/cloudevent_publisher.go @@ -7,13 +7,20 @@ import ( "reflect" "time" - "github.com/golang/protobuf/jsonpb" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces" + "github.com/flyteorg/flyteadmin/pkg/manager/impl/util" + repositoryInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces" + "github.com/flyteorg/flyteadmin/pkg/repositories/transformers" + runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytestdlib/storage" - "github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations" + "github.com/golang/protobuf/jsonpb" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "k8s.io/apimachinery/pkg/util/sets" @@ -107,20 +114,217 @@ func (p *Publisher) shouldPublishEvent(notificationType string) bool { return p.events.Has(notificationType) } +type CloudEventWrappedPublisher struct { + db repositoryInterfaces.Repository + sender interfaces.Sender + systemMetrics implementations.EventPublisherSystemMetrics + storageClient *storage.DataStore + urlData dataInterfaces.RemoteURLInterface + remoteDataConfig runtimeInterfaces.RemoteDataConfig +} + +func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context.Context, rawEvent *event.WorkflowExecutionEvent) (*event.CloudEventWorkflowExecution, error) { + + // Basic error checking + if rawEvent == nil { + return nil, fmt.Errorf("nothing to publish, WorkflowExecution event is nil") + } + if rawEvent.ExecutionId == nil { + logger.Warningf(ctx, "nil execution id in event [%+v]", rawEvent) + return nil, fmt.Errorf("nil execution id in event [%+v]", rawEvent) + } + + // TODO: Make this one call to the DB instead of two. + executionModel, err := c.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{ + Project: rawEvent.ExecutionId.Project, + Domain: rawEvent.ExecutionId.Domain, + Name: rawEvent.ExecutionId.Name, + }) + ex, err := transformers.FromExecutionModel(ctx, executionModel, nil) + if ex.Closure.WorkflowId == nil { + logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex) + return nil, fmt.Errorf("workflow id is nil for execution [%+v]", ex) + } + workflowModel, err := c.db.WorkflowRepo().Get(ctx, repositoryInterfaces.Identifier{ + Project: ex.Closure.WorkflowId.Project, + Domain: ex.Closure.WorkflowId.Domain, + Name: ex.Closure.WorkflowId.Name, + Version: ex.Closure.WorkflowId.Version, + }) + var workflowInterface core.TypedInterface + if workflowModel.TypedInterface != nil && len(workflowModel.TypedInterface) > 0 { + err = proto.Unmarshal(workflowModel.TypedInterface, &workflowInterface) + if err != nil { + return nil, fmt.Errorf( + "artifact eventing - failed to unmarshal TypedInterface for workflow [%+v] with err: %v", + workflowModel.ID, err) + } + } + + var outputs *core.LiteralMap + if rawEvent.GetOutputData() != nil { + fmt.Printf("remove this - Got output data") + outputs = rawEvent.GetOutputData() + } else if len(rawEvent.GetOutputUri()) > 0 { + fmt.Printf("remove this - Got output URI") + // GetInputs actually fetches the data, even though this is an output + outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri()) + if err != nil { + // TODO: metric this + logger.Warningf(ctx, "Error fetching output literal map %v", rawEvent) + return nil, err + } + } + + if outputs == nil { + // todo: remove after testing + logger.Debugf(ctx, "Output data was nil for %v", rawEvent) + } + + return &event.CloudEventWorkflowExecution{ + RawEvent: rawEvent, + OutputData: outputs, + OutputInterface: &workflowInterface, + }, nil +} + +func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) { + return &event.CloudEventNodeExecution{ + RawEvent: rawEvent, + }, nil +} + +func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) { + if rawEvent == nil { + return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil") + } + + taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{ + Project: rawEvent.TaskId.Project, + Domain: rawEvent.TaskId.Domain, + Name: rawEvent.TaskId.Name, + Version: rawEvent.TaskId.Version, + }) + if err != nil { + // TODO: metric this + logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", rawEvent.TaskId, err) + return nil, err + } + task, err := transformers.FromTaskModel(taskModel) + + var outputs *core.LiteralMap + if rawEvent.GetOutputData() != nil { + fmt.Printf("remove this - task Got output data") + outputs = rawEvent.GetOutputData() + } else if len(rawEvent.GetOutputUri()) > 0 { + fmt.Printf("remove this - task Got output URI") + // GetInputs actually fetches the data, even though this is an output + outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, + c.storageClient, rawEvent.GetOutputUri()) + if err != nil { + fmt.Printf("Error fetching output literal map %v", rawEvent) + return nil, err + } + } + if outputs == nil { + // todo: remove + fmt.Printf("No output data found for task execution %v\n", rawEvent) + } + + return &event.CloudEventTaskExecution{ + RawEvent: rawEvent, + OutputData: outputs, + OutputInterface: task.Closure.CompiledTask.Template.Interface, + }, nil +} + +func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { + c.systemMetrics.PublishTotal.Inc() + logger.Debugf(ctx, "Publishing the following message [%+v]", msg) + + var err error + var executionID string + var phase string + var eventTime time.Time + var finalMsg proto.Message + + switch msgType := msg.(type) { + case *admin.WorkflowExecutionEventRequest: + e := msgType.Event + executionID = e.ExecutionId.String() + phase = e.Phase.String() + eventTime = e.OccurredAt.AsTime() + + finalMsg, err = c.TransformWorkflowExecutionEvent(ctx, e) + if err != nil { + logger.Errorf(ctx, "Failed to transform workflow execution event with error: %v", err) + return err + } + + case *admin.TaskExecutionEventRequest: + e := msgType.Event + executionID = e.TaskId.String() + phase = e.Phase.String() + eventTime = e.OccurredAt.AsTime() + finalMsg, err = c.TransformTaskExecutionEvent(ctx, e) + case *admin.NodeExecutionEventRequest: + e := msgType.Event + executionID = msgType.Event.Id.String() + phase = e.Phase.String() + eventTime = e.OccurredAt.AsTime() + finalMsg, err = c.TransformNodeExecutionEvent(ctx, e) + default: + return fmt.Errorf("unsupported event types [%+v]", reflect.TypeOf(msg)) + } + + // Explicitly jsonpb marshal the proto. Otherwise, event.SetData will use json.Marshal which doesn't work well + // with proto oneof fields. + marshaler := jsonpb.Marshaler{} + buf := bytes.NewBuffer([]byte{}) + err = marshaler.Marshal(buf, finalMsg) + if err != nil { + c.systemMetrics.PublishError.Inc() + logger.Errorf(ctx, "Failed to jsonpb marshal [%v] with error: %v", msg, err) + return fmt.Errorf("failed to jsonpb marshal [%v] with error: %w", msg, err) + } + + cloudEvt := cloudevents.NewEvent() + // CloudEvent specification: https://github.com/cloudevents/spec/blob/v1.0/spec.md#required-attributes + cloudEvt.SetType(fmt.Sprintf("%v.%v", cloudEventTypePrefix, notificationType)) + cloudEvt.SetSource(cloudEventSource) + cloudEvt.SetID(fmt.Sprintf("%v.%v", executionID, phase)) + cloudEvt.SetTime(eventTime) + cloudEvt.SetExtension(jsonSchemaURLKey, jsonSchemaURL) + + if err := cloudEvt.SetData(cloudevents.ApplicationJSON, buf.Bytes()); err != nil { + c.systemMetrics.PublishError.Inc() + logger.Errorf(ctx, "Failed to encode message [%v] with error: %v", msg, err) + return err + } + + if err := c.sender.Send(ctx, notificationType, cloudEvt); err != nil { + c.systemMetrics.PublishError.Inc() + logger.Errorf(ctx, "Failed to send message [%v] with error: %v", msg, err) + return err + } + c.systemMetrics.PublishSuccess.Inc() + return nil +} + func NewCloudEventsPublisher(sender interfaces.Sender, scope promutils.Scope, eventTypes []string) interfaces.Publisher { eventSet := sets.NewString() - for _, event := range eventTypes { - if event == implementations.AllTypes || event == implementations.AllTypesShort { + for _, eventType := range eventTypes { + if eventType == implementations.AllTypes || eventType == implementations.AllTypesShort { for _, e := range implementations.SupportedEvents { eventSet = eventSet.Insert(e) } break } - if e, found := implementations.SupportedEvents[event]; found { + if e, found := implementations.SupportedEvents[eventType]; found { eventSet = eventSet.Insert(e) } else { - panic(fmt.Errorf("unsupported event type [%s] in the config", event)) + panic(fmt.Errorf("unsupported event type [%s] in the config", eventType)) } } @@ -130,3 +334,16 @@ func NewCloudEventsPublisher(sender interfaces.Sender, scope promutils.Scope, ev events: eventSet, } } + +func NewCloudEventsWrappedPublisher( + db repositoryInterfaces.Repository, sender interfaces.Sender, scope promutils.Scope, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, remoteDataConfig runtimeInterfaces.RemoteDataConfig) interfaces.Publisher { + + return &CloudEventWrappedPublisher{ + db: db, + sender: sender, + systemMetrics: implementations.NewEventPublisherSystemMetrics(scope.NewSubScope("cloudevents_publisher")), + storageClient: storageClient, + urlData: urlData, + remoteDataConfig: remoteDataConfig, + } +} diff --git a/pkg/async/cloudevent/redis/redis_publisher.go b/pkg/async/cloudevent/redis/redis_publisher.go new file mode 100644 index 000000000..5c3c6e77c --- /dev/null +++ b/pkg/async/cloudevent/redis/redis_publisher.go @@ -0,0 +1,62 @@ +package redis + +import ( + "context" + "fmt" + "github.com/flyteorg/flytestdlib/logger" + + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/golang/protobuf/proto" + redisAPI "github.com/redis/go-redis/v9" +) + +// Publisher satisfies the gizmo/pubsub.Publisher interface. +type Publisher struct { + config interfaces.RedisConfig + client *redisAPI.Client + topicName string +} + +func (p *Publisher) Publish(ctx context.Context, topic string, msg proto.Message) error { + if len(topic) == 0 { + topic = p.topicName + } + logger.Debugf(ctx, "Publishing message to topic [%s]", topic) + + msgBytes, err := proto.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + res := p.client.Publish(ctx, topic, msgBytes) + if res.Err() != nil { + return fmt.Errorf("failed to publish message to topic [%s]: %w", topic, res.Err()) + } + return nil +} + +func (p *Publisher) PublishRaw(ctx context.Context, topic string, msgBytes []byte) error { + if len(topic) == 0 { + topic = p.topicName + } + logger.Debugf(ctx, "Publishing raw message to topic [%s]", topic) + + res := p.client.Publish(ctx, topic, msgBytes) + if res.Err() != nil { + return fmt.Errorf("failed to publish raw message to topic [%s]: %w", topic, res.Err()) + } + return nil +} + +func NewPublisher(config interfaces.RedisConfig) (*Publisher, error) { + client := redisAPI.NewClient(&redisAPI.Options{ + Addr: config.Addr, + Password: config.Password, + DB: config.DB, + }) + + return &Publisher{ + config: config, + client: client, + }, nil +} diff --git a/pkg/async/cloudevent/redis/redis_publisher_test.go b/pkg/async/cloudevent/redis/redis_publisher_test.go new file mode 100644 index 000000000..74a9776b4 --- /dev/null +++ b/pkg/async/cloudevent/redis/redis_publisher_test.go @@ -0,0 +1,33 @@ +package redis + +import ( + "context" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "testing" + + redisAPI "github.com/redis/go-redis/v9" +) + +func TestRjkl(t *testing.T) { + ctx := context.TODO() + + client := redisAPI.NewClient(&redisAPI.Options{ + Addr: "localhost:6379", + }) + + e := event.WorkflowExecutionEvent{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + ProducerId: "", + Phase: 0, + } + ee, _ := proto.Marshal(&e) + r := client.Publish(ctx, "channel007", ee) + assert.NoError(t, r.Err()) +} diff --git a/pkg/common/cloud.go b/pkg/common/cloud.go index cba0a6879..5e1ca4773 100644 --- a/pkg/common/cloud.go +++ b/pkg/common/cloud.go @@ -8,5 +8,6 @@ const ( AWS CloudProvider = "aws" GCP CloudProvider = "gcp" Local CloudProvider = "local" + Redis CloudProvider = "redis" None CloudProvider = "none" ) diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index 1b87d05f3..2611a8f1c 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -139,6 +139,13 @@ func FlyteURLsFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, return res } +// FlyteURLKeyFromNodeExecutionIDAndOutput is a modified version of the function above. +func FlyteURLKeyFromNodeExecutionIDAndOutput(nodeExecutionID core.NodeExecutionIdentifier, artifactType ArtifactType, outputName string) string { + res := fmt.Sprintf("%s/%s/%s/%s", nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId, artifactType, outputName) + + return res +} + func FlyteURLsFromTaskExecutionID(taskExecutionID core.TaskExecutionIdentifier, deck bool) *admin.FlyteURLs { base := fmt.Sprintf("flyte://v1/%s/%s/%s/%s/%s", taskExecutionID.NodeExecutionId.ExecutionId.Project, taskExecutionID.NodeExecutionId.ExecutionId.Domain, taskExecutionID.NodeExecutionId.ExecutionId.Name, taskExecutionID.NodeExecutionId.NodeId, strconv.Itoa(int(taskExecutionID.RetryAttempt))) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 5a31d7d5b..e1e5b2804 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -3,8 +3,6 @@ package impl import ( "context" "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "strconv" "time" @@ -1221,62 +1219,127 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime( watch.Observe(*executionModel.ExecutionCreatedAt, terminalEventTime) } -func (m *ExecutionManager) tempHandleArtifactEventEmitting(ctx context.Context, request admin.WorkflowExecutionEventRequest) { - // Print out what the catalog service will eventually do. Can all this be retrieved just from the raw event? No. - // Missing: Artifact name and other info declared in the wf decorator. - var outputs *core.LiteralMap - var err error - if request.Event.GetOutputData() != nil { - fmt.Printf("Got output data") - outputs = request.Event.GetOutputData() - } else if len(request.Event.GetOutputUri()) > 0 { - fmt.Printf("Got output URI") - // GetInputs actually fetches the data, even though this is an output - outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), - m.storageClient, request.Event.GetOutputUri()) - if err != nil { - fmt.Printf("Error fetching output literal map %v", request.Event) - } - } else { - fmt.Printf("No output data found for %v\n", request.Event) - } - if outputs != nil { - nodeExecutionID := core.NodeExecutionIdentifier{ - NodeId: "end-node", - ExecutionId: request.Event.ExecutionId, - } - urls := common.FlyteURLsFromNodeExecutionID(nodeExecutionID, false) - outputURLs := common.AppendLinksForLiteralMap(urls.GetInputs(), *outputs) - - for k, v := range outputs.Literals { - as := artifact.ArtifactSpec{ - Value: v, - // Type, tags and aliases will need to be filled in later - Source: &artifact.ArtifactSpec_Execution{ - Execution: request.Event.ExecutionId, - }, - } - ak := core.ArtifactKey{ - Project: request.Event.ExecutionId.Project, - Domain: request.Event.ExecutionId.Domain, - // This will need to be filled in later, will need to pull from task template, or set to - // something pretty unique, like the task ID. - Name: "", - } - - a := artifact.CreateArtifactRequest{ - ArtifactKey: &ak, - Version: request.Event.ExecutionId.Name, - Uri: outputURLs[k], - Spec: &as, - } - e := event.ArtifactCreateEvent{ - CreateRequest: &a, - } - print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) - } - } -} +//// getAliases creates a list of aliases for a given output in a workflow execution. It should be called once per +//// output for a given workflow execution. +//func (m *ExecutionManager) getAliases(workflowID core.Identifier, execID core.WorkflowExecutionIdentifier, typedInterface core.TypedInterface, outputName string) ([]*artifact.Alias, error) { +// +// if v, ok := typedInterface.Outputs.Variables[outputName]; ok { +// defaultAlias := &artifact.Alias{ +// Name: fmt.Sprintf("%s/%s", workflowID.Name, outputName), +// Value: execID.Name, +// } +// +// if v.Artifact != nil && len(v.Artifact.Spec.Aliases) > 0 { +// aliases := make([]*artifact.Alias, 0, len(v.Artifact.Spec.Aliases)+1) +// aliases = append(aliases, defaultAlias) +// for _, a := range v.Artifact.Spec.Aliases { +// aliases = append(aliases, &artifact.Alias{ +// Name: a.Name, +// Value: a.Value, +// }) +// } +// return aliases, nil +// } +// +// // If nothing specified by the user, just return the default alias. +// return []*artifact.Alias{defaultAlias}, nil +// } +// return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "output [%s] not found in workflow interface [%v] for workflow [%v]", outputName, typedInterface, workflowID) +//} +// +//func (m *ExecutionManager) handleArtifactEventEmitting(ctx context.Context, request admin.WorkflowExecutionEventRequest) { +// // Basic error checking +// if request.Event.ExecutionId == nil { +// logger.Warningf(ctx, "nil execution id in event request [%+v]", request) +// return +// } +// +// // TODO: Make this one call to the DB instead of two. +// executionModel, err := m.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{ +// Project: request.Event.ExecutionId.Project, +// Domain: request.Event.ExecutionId.Domain, +// Name: request.Event.ExecutionId.Name, +// }) +// ex, err := transformers.FromExecutionModel(ctx, executionModel, nil) +// if ex.Closure.WorkflowId == nil { +// logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex) +// return +// } +// workflowModel, err := m.db.WorkflowRepo().Get(ctx, repositoryInterfaces.Identifier{ +// Project: ex.Closure.WorkflowId.Project, +// Domain: ex.Closure.WorkflowId.Domain, +// Name: ex.Closure.WorkflowId.Name, +// Version: ex.Closure.WorkflowId.Version, +// }) +// var workflowInterface core.TypedInterface +// if workflowModel.TypedInterface != nil && len(workflowModel.TypedInterface) > 0 { +// err = proto.Unmarshal(workflowModel.TypedInterface, &workflowInterface) +// if err != nil { +// logger.Errorf(ctx, +// "Artifact eventing - failed to unmarshal TypedInterface for workflow [%+v] with err: %v", +// workflowModel.ID, err) +// return +// } +// } +// +// var outputs *core.LiteralMap +// if request.Event.GetOutputData() != nil { +// fmt.Printf("remove this - Got output data") +// outputs = request.Event.GetOutputData() +// } else if len(request.Event.GetOutputUri()) > 0 { +// fmt.Printf("remove this - Got output URI") +// // GetInputs actually fetches the data, even though this is an output +// outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), +// m.storageClient, request.Event.GetOutputUri()) +// if err != nil { +// // TODO: metric this +// logger.Warningf(ctx, "Error fetching output literal map %v", request.Event) +// } +// } else { +// logger.Debugf(ctx, "Neither output data nor uri found for %v", request.Event) +// return +// } +// if outputs == nil { +// logger.Debugf(ctx, "Output data was nil for %v", request.Event) +// return +// } +// +// nodeExecutionID := core.NodeExecutionIdentifier{ +// NodeId: "end-node", +// ExecutionId: request.Event.ExecutionId, +// } +// +// for k, v := range outputs.Literals { +// // Use input type because workflow outputs are inputs to the end node. +// artifactKeySuffix := common.FlyteURLKeyFromNodeExecutionIDAndOutput(nodeExecutionID, common.ArtifactTypeI, k) +// +// aliases, err := m.getAliases(*ex.Closure.WorkflowId, *request.Event.ExecutionId, workflowInterface, k) +// if err != nil { +// logger.Errorf(ctx, "Failed getting alias for [%s] in workflow [%v], err: %v", k, ex.Closure.WorkflowId, err) +// } +// as := artifact.ArtifactSpec{ +// Value: v, +// Source: &artifact.ArtifactSpec_Execution{ +// Execution: request.Event.ExecutionId, +// }, +// Aliases: aliases, +// } +// ak := core.ArtifactKey{ +// Project: request.Event.ExecutionId.Project, +// Domain: request.Event.ExecutionId.Domain, +// Suffix: artifactKeySuffix, +// } +// +// a := artifact.CreateArtifactRequest{ +// ArtifactKey: &ak, +// Spec: &as, +// } +// e := event.ArtifactCreateEvent{ +// CreateRequest: &a, +// } +// print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) +// } +//} func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admin.WorkflowExecutionEventRequest) ( *admin.WorkflowExecutionEventResponse, error) { @@ -1363,7 +1426,10 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi if request.Event.GetOutputData() != nil { m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData()))) } - m.tempHandleArtifactEventEmitting(ctx, request) + //go func() { + // logger.Debugf(ctx, "Emitting workflow success artifact event flow for [%+v]", request) + // m.handleArtifactEventEmitting(ctx, request) + //}() err = m.publishNotifications(ctx, request, *executionModel) if err != nil { diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index 832cee54c..06fef6489 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -3,7 +3,6 @@ package impl import ( "context" "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "strconv" cloudeventInterfaces "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces" @@ -32,7 +31,6 @@ import ( runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flytestdlib/logger" "google.golang.org/grpc/codes" ) @@ -131,57 +129,69 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState( return *existingTaskExecution, nil } -func (m *TaskExecutionManager) tempHandleArtifactEventEmitting(ctx context.Context, request admin.TaskExecutionEventRequest, taskExecutionID core.TaskExecutionIdentifier) { - // Print out what the catalog service will eventually do. Can all this be retrieved just from the raw event? No. - // Missing is: tags/aliases/given name - var outputs *core.LiteralMap - var err error - if request.Event.GetOutputData() != nil { - fmt.Printf("Got output data") - outputs = request.Event.GetOutputData() - } else if len(request.Event.GetOutputUri()) > 0 { - fmt.Printf("Got output URI") - // GetInputs actually fetches the data, even though this is an output - outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), - m.storageClient, request.Event.GetOutputUri()) - if err != nil { - fmt.Printf("Error fetching output literal map %v", request.Event) - } - } else { - fmt.Printf("No output data found for %v\n", request.Event) - } - if outputs != nil { - urls := common.FlyteURLsFromTaskExecutionID(taskExecutionID, false) - outputURLs := common.AppendLinksForLiteralMap(urls.GetOutputs(), *outputs) - for k, v := range outputs.Literals { - as := artifact.ArtifactSpec{ - Value: v, - // Type, tags and aliases will need to be filled in later - Source: &artifact.ArtifactSpec_TaskExecution{ - TaskExecution: &taskExecutionID, - }, - } - ak := core.ArtifactKey{ - Project: request.Event.ParentNodeExecutionId.ExecutionId.Project, - Domain: request.Event.ParentNodeExecutionId.ExecutionId.Domain, - // This will need to be filled in later, will need to pull from task template, or set to - // something pretty unique, like the task ID. - Name: "", - } - - a := artifact.CreateArtifactRequest{ - ArtifactKey: &ak, - Version: request.Event.ParentNodeExecutionId.ExecutionId.Name, - Uri: outputURLs[k], - Spec: &as, - } - e := event.ArtifactCreateEvent{ - CreateRequest: &a, - } - print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) - } - } -} +//func (m *TaskExecutionManager) handleArtifactEventEmitting(ctx context.Context, request admin.TaskExecutionEventRequest, taskExecutionID core.TaskExecutionIdentifier) { +// +// taskModel, err := m.db.TaskRepo().Get(ctx, repoInterfaces.Identifier{ +// Project: request.Event.TaskId.Project, +// Domain: request.Event.TaskId.Domain, +// Name: request.Event.TaskId.Name, +// Version: request.Event.TaskId.Version, +// }) +// if err != nil { +// // TODO: metric this +// logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", request.Event.TaskId, err) +// return +// } +// task, err := transformers.FromTaskModel(taskModel) +// task.Closure.CompiledTask.Template.Interface +// +// var outputs *core.LiteralMap +// if request.Event.GetOutputData() != nil { +// fmt.Printf("remove this - Got output data") +// outputs = request.Event.GetOutputData() +// } else if len(request.Event.GetOutputUri()) > 0 { +// fmt.Printf("remove this - Got output URI") +// // GetInputs actually fetches the data, even though this is an output +// outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), +// m.storageClient, request.Event.GetOutputUri()) +// if err != nil { +// fmt.Printf("Error fetching output literal map %v", request.Event) +// } +// } else { +// fmt.Printf("No output data found for %v\n", request.Event) +// } +// if outputs != nil { +// urls := common.FlyteURLsFromTaskExecutionID(taskExecutionID, false) +// outputURLs := common.AppendLinksForLiteralMap(urls.GetOutputs(), *outputs) +// for k, v := range outputs.Literals { +// as := artifact.ArtifactSpec{ +// Value: v, +// // Type, tags and aliases will need to be filled in later +// Source: &artifact.ArtifactSpec_TaskExecution{ +// TaskExecution: &taskExecutionID, +// }, +// } +// ak := core.ArtifactKey{ +// Project: request.Event.ParentNodeExecutionId.ExecutionId.Project, +// Domain: request.Event.ParentNodeExecutionId.ExecutionId.Domain, +// // This will need to be filled in later, will need to pull from task template, or set to +// // something pretty unique, like the task ID. +// Name: "", +// } +// +// a := artifact.CreateArtifactRequest{ +// ArtifactKey: &ak, +// Version: request.Event.ParentNodeExecutionId.ExecutionId.Name, +// Uri: outputURLs[k], +// Spec: &as, +// } +// e := event.ArtifactCreateEvent{ +// CreateRequest: &a, +// } +// print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) +// } +// } +//} func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) ( *admin.TaskExecutionEventResponse, error) { @@ -256,7 +266,10 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req if request.Event.GetOutputData() != nil { m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData()))) } - m.tempHandleArtifactEventEmitting(ctx, request, taskExecutionID) + //go func() { + // logger.Debugf(ctx, "Emitting task execution artifacts for [%+v] [%+v]", taskExecutionID, request) + // m.handleArtifactEventEmitting(ctx, request, taskExecutionID) + //}() } if err = m.notificationClient.Publish(ctx, proto.MessageName(&request), &request); err != nil { diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 77c78f480..4e3ee7f8a 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -101,7 +101,6 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope) - cloudEventPublisher := cloudevent.NewCloudEventsPublisher(ctx, *configuration.ApplicationConfiguration().GetCloudEventsConfig(), adminScope) go func() { logger.Info(ctx, "Started processing notifications.") processor.StartProcessing() @@ -130,6 +129,8 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi RemoteDataStoreClient: dataStorageClient, }).GetRemoteURLInterface() + cloudEventPublisher := cloudevent.NewCloudEventsPublisher(ctx, repo, dataStorageClient, urlData, *configuration.ApplicationConfiguration().GetCloudEventsConfig(), *configuration.ApplicationConfiguration().GetRemoteDataConfig(), adminScope) + workflowManager := manager.NewWorkflowManager( repo, configuration, workflowengineImpl.NewCompiler(), dataStorageClient, applicationConfiguration.GetMetadataStoragePrefix(), adminScope.NewSubScope("workflow_manager")) diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index cf9bf2e9e..aeb0ab34e 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -226,6 +226,23 @@ type KafkaConfig struct { Brokers []string `json:"brokers"` } +// RedisConfig is basically a subset of the client options in the Redis library +type RedisConfig struct { + // host:port address. + Addr string `json:"addr"` + // Use the specified Username to authenticate the current connection + // with one of the connections defined in the ACL list when connecting + // to a Redis 6.0 instance, or greater, that is using the Redis ACL system. + Username string `json:"username"` + // Optional password. Must match the password specified in the + // requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower), + // or the User Password when connecting to a Redis 6.0 instance, or greater, + // that is using the Redis ACL system. + Password string `json:"password"` + // Database to be selected after connecting to the server. + DB int `json:"db"` +} + // This section holds configuration for the event scheduler used to schedule workflow executions. type EventSchedulerConfig struct { // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' @@ -526,12 +543,15 @@ type CloudEventsConfig struct { AWSConfig AWSConfig `json:"aws"` GCPConfig GCPConfig `json:"gcp"` KafkaConfig KafkaConfig `json:"kafka"` + RedisConfig RedisConfig `json:"redis"` // Publish events to a pubsub tops EventsPublisherConfig EventsPublisherConfig `json:"eventsPublisher"` // Number of times to attempt recreating a notifications processor client should there be any disruptions. ReconnectAttempts int `json:"reconnectAttempts"` // Specifies the time interval to wait before attempting to reconnect the notifications processor client. ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` + // Transform the raw events into the fuller cloudevent events before publishing + TransformToCloudEvents bool `json:"transformToCloudEvents"` } // Configuration specific to notifications handling From 9c748aea186d9c078a2e59f4aca9d9d7d4c12a13 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 24 Jul 2023 11:31:17 -0700 Subject: [PATCH 4/8] rename events, only transform on succeeded, background context, receiving on redis side Signed-off-by: Yee Hing Tong --- flyteadmin_config.yaml | 1 + pkg/async/cloudevent/factory.go | 1 + .../implementations/cloudevent_publisher.go | 29 +++++++++++++++++-- pkg/manager/impl/execution_manager.go | 3 +- pkg/manager/impl/task_execution_manager.go | 3 +- pkg/repositories/transformers/execution.go | 6 ++++ 6 files changed, 38 insertions(+), 5 deletions(-) diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index 0632f0045..ab56c8895 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -113,6 +113,7 @@ externalEvents: topicName: "bar" eventTypes: all cloudEvents: + enable: true type: redis redis: addr: "localhost:6379" diff --git a/pkg/async/cloudevent/factory.go b/pkg/async/cloudevent/factory.go index 728e92e3a..8166c6072 100644 --- a/pkg/async/cloudevent/factory.go +++ b/pkg/async/cloudevent/factory.go @@ -96,6 +96,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi publisher, err = redisPublisher.NewPublisher(cloudEventsConfig.RedisConfig) return err }) + logger.Infof(ctx, "Using Redis cloud events publisher [%v]", publisher) // Persistent errors should hard fail if err != nil { diff --git a/pkg/async/cloudevent/implementations/cloudevent_publisher.go b/pkg/async/cloudevent/implementations/cloudevent_publisher.go index fb3bb603e..44a29b1ac 100644 --- a/pkg/async/cloudevent/implementations/cloudevent_publisher.go +++ b/pkg/async/cloudevent/implementations/cloudevent_publisher.go @@ -134,13 +134,22 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context return nil, fmt.Errorf("nil execution id in event [%+v]", rawEvent) } + // For now, don't append any additional information unless succeeded + if rawEvent.Phase != core.WorkflowExecution_SUCCEEDED { + return &event.CloudEventWorkflowExecution{ + RawEvent: rawEvent, + OutputData: nil, + OutputInterface: nil, + }, nil + } + // TODO: Make this one call to the DB instead of two. executionModel, err := c.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{ Project: rawEvent.ExecutionId.Project, Domain: rawEvent.ExecutionId.Domain, Name: rawEvent.ExecutionId.Name, }) - ex, err := transformers.FromExecutionModel(ctx, executionModel, nil) + ex, err := transformers.FromExecutionModel(ctx, executionModel, transformers.DefaultExecutionTransformerOptions) if ex.Closure.WorkflowId == nil { logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex) return nil, fmt.Errorf("workflow id is nil for execution [%+v]", ex) @@ -199,6 +208,15 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil") } + // For now, don't append any additional information unless succeeded + if rawEvent.Phase != core.TaskExecution_SUCCEEDED { + return &event.CloudEventTaskExecution{ + RawEvent: rawEvent, + OutputData: nil, + OutputInterface: nil, + }, nil + } + taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{ Project: rawEvent.TaskId.Project, Domain: rawEvent.TaskId.Domain, @@ -247,9 +265,12 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy var phase string var eventTime time.Time var finalMsg proto.Message + // this is a modified notification type. will be used for both event type and publishing topic. + var topic string switch msgType := msg.(type) { case *admin.WorkflowExecutionEventRequest: + topic = "cloudevents.WorkflowExecution" e := msgType.Event executionID = e.ExecutionId.String() phase = e.Phase.String() @@ -262,12 +283,14 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy } case *admin.TaskExecutionEventRequest: + topic = "cloudevents.TaskExecution" e := msgType.Event executionID = e.TaskId.String() phase = e.Phase.String() eventTime = e.OccurredAt.AsTime() finalMsg, err = c.TransformTaskExecutionEvent(ctx, e) case *admin.NodeExecutionEventRequest: + topic = "cloudevents.NodeExecution" e := msgType.Event executionID = msgType.Event.Id.String() phase = e.Phase.String() @@ -290,7 +313,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy cloudEvt := cloudevents.NewEvent() // CloudEvent specification: https://github.com/cloudevents/spec/blob/v1.0/spec.md#required-attributes - cloudEvt.SetType(fmt.Sprintf("%v.%v", cloudEventTypePrefix, notificationType)) + cloudEvt.SetType(fmt.Sprintf("%v.%v", cloudEventTypePrefix, topic)) cloudEvt.SetSource(cloudEventSource) cloudEvt.SetID(fmt.Sprintf("%v.%v", executionID, phase)) cloudEvt.SetTime(eventTime) @@ -302,7 +325,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy return err } - if err := c.sender.Send(ctx, notificationType, cloudEvt); err != nil { + if err := c.sender.Send(ctx, topic, cloudEvt); err != nil { c.systemMetrics.PublishError.Inc() logger.Errorf(ctx, "Failed to send message [%v] with error: %v", msg, err) return err diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index e1e5b2804..0f5da69fb 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -1447,7 +1447,8 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi } go func() { - if err := m.cloudEventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil { + ceCtx := context.TODO() + if err := m.cloudEventPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil { m.systemMetrics.PublishEventError.Inc() logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err) } diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index 06fef6489..9378006a9 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -278,7 +278,8 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req } go func() { - if err := m.cloudEventsPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil { + ceCtx := context.TODO() + if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil { logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err) } }() diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index de0d986af..eb54cb9c2 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -322,6 +322,12 @@ func GetExecutionIdentifier(executionModel *models.Execution) core.WorkflowExecu func FromExecutionModel(ctx context.Context, executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) { var spec admin.ExecutionSpec var err error + if executionModel.Spec == nil { + fmt.Println("!!@ executionModel.Spec is nil") + } + if opts == nil { + fmt.Println("!!@ opts is nil") + } if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec") } From 30adee6713980a4aac80e080efd7f339579cd269 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 25 Jul 2023 15:42:38 -0700 Subject: [PATCH 5/8] marshal by json instead of bytes, remove helper function, add new key functions, update the source field Signed-off-by: Yee Hing Tong --- .../implementations/cloudevent_publisher.go | 18 ++++++++++++++- .../cloudevent/implementations/sender.go | 4 ++-- pkg/common/flyte_url.go | 22 +++++++++---------- pkg/manager/impl/node_execution_manager.go | 3 ++- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/pkg/async/cloudevent/implementations/cloudevent_publisher.go b/pkg/async/cloudevent/implementations/cloudevent_publisher.go index 44a29b1ac..fda8e75bd 100644 --- a/pkg/async/cloudevent/implementations/cloudevent_publisher.go +++ b/pkg/async/cloudevent/implementations/cloudevent_publisher.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/flyteorg/flyteadmin/pkg/common" "reflect" "time" @@ -267,6 +268,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy var finalMsg proto.Message // this is a modified notification type. will be used for both event type and publishing topic. var topic string + var eventSource = cloudEventSource switch msgType := msg.(type) { case *admin.WorkflowExecutionEventRequest: @@ -276,6 +278,11 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy phase = e.Phase.String() eventTime = e.OccurredAt.AsTime() + dummyNodeExecutionID := core.NodeExecutionIdentifier{ + NodeId: "end-node", + ExecutionId: e.ExecutionId, + } + eventSource = common.FlyteURLKeyFromNodeExecutionID(dummyNodeExecutionID, common.ArtifactTypeI) finalMsg, err = c.TransformWorkflowExecutionEvent(ctx, e) if err != nil { logger.Errorf(ctx, "Failed to transform workflow execution event with error: %v", err) @@ -288,6 +295,12 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy executionID = e.TaskId.String() phase = e.Phase.String() eventTime = e.OccurredAt.AsTime() + + if e.ParentNodeExecutionId == nil { + return fmt.Errorf("parent node execution id is nil for task execution [%+v]", e) + } + eventSource = common.FlyteURLKeyFromNodeExecutionIDRetry(*e.ParentNodeExecutionId, + int(e.RetryAttempt), common.ArtifactTypeO) finalMsg, err = c.TransformTaskExecutionEvent(ctx, e) case *admin.NodeExecutionEventRequest: topic = "cloudevents.NodeExecution" @@ -314,7 +327,10 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy cloudEvt := cloudevents.NewEvent() // CloudEvent specification: https://github.com/cloudevents/spec/blob/v1.0/spec.md#required-attributes cloudEvt.SetType(fmt.Sprintf("%v.%v", cloudEventTypePrefix, topic)) - cloudEvt.SetSource(cloudEventSource) + // According to the spec, the combination of source and id should be unique. + // Artifact service's uniqueness is project/domain/suffix. project/domain are available from the execution id. + // so set the suffix as the source. Can ignore ID since Artifact will only listen to succeeded events. + cloudEvt.SetSource(eventSource) cloudEvt.SetID(fmt.Sprintf("%v.%v", executionID, phase)) cloudEvt.SetTime(eventTime) cloudEvt.SetExtension(jsonSchemaURLKey, jsonSchemaURL) diff --git a/pkg/async/cloudevent/implementations/sender.go b/pkg/async/cloudevent/implementations/sender.go index 0b0c6f9eb..66bacbbaa 100644 --- a/pkg/async/cloudevent/implementations/sender.go +++ b/pkg/async/cloudevent/implementations/sender.go @@ -2,11 +2,11 @@ package implementations import ( "context" + "encoding/json" "fmt" "github.com/NYTimes/gizmo/pubsub" "github.com/Shopify/sarama" - pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/flyteorg/flytestdlib/logger" @@ -24,7 +24,7 @@ type PubSubSender struct { } func (s *PubSubSender) Send(ctx context.Context, notificationType string, event cloudevents.Event) error { - eventByte, err := pbcloudevents.Protobuf.Marshal(&event) + eventByte, err := json.Marshal(&event) if err != nil { logger.Errorf(ctx, "Failed to marshal cloudevent with error: %v", err) return err diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index 2611a8f1c..c8dd7c68c 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -139,9 +139,16 @@ func FlyteURLsFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, return res } -// FlyteURLKeyFromNodeExecutionIDAndOutput is a modified version of the function above. -func FlyteURLKeyFromNodeExecutionIDAndOutput(nodeExecutionID core.NodeExecutionIdentifier, artifactType ArtifactType, outputName string) string { - res := fmt.Sprintf("%s/%s/%s/%s", nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId, artifactType, outputName) +// FlyteURLKeyFromNodeExecutionID is a modified version of the function above. +func FlyteURLKeyFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, artifactType ArtifactType) string { + res := fmt.Sprintf("%s/%s/%s", nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId, artifactType) + + return res +} + +// FlyteURLKeyFromNodeExecutionID is a modified version of the function above. +func FlyteURLKeyFromNodeExecutionIDRetry(nodeExecutionID core.NodeExecutionIdentifier, retry int, artifactType ArtifactType) string { + res := fmt.Sprintf("%s/%s/%s/%s", nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId, strconv.Itoa(retry), artifactType) return res } @@ -159,12 +166,3 @@ func FlyteURLsFromTaskExecutionID(taskExecutionID core.TaskExecutionIdentifier, } return res } - -// AppendLinksForLiteralMap returns a map of URLs for each output in the supplied literal map. -func AppendLinksForLiteralMap(baseOutputURL string, literalMap core.LiteralMap) map[string]string { - res := make(map[string]string, len(literalMap.Literals)) - for k, _ := range literalMap.Literals { - res[k] = fmt.Sprintf("%s/%s", baseOutputURL, k) - } - return res -} diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index bcc4362db..e638aaa4c 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -297,7 +297,8 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi } go func() { - if err := m.cloudEventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil { + ceCtx := context.TODO() + if err := m.cloudEventPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil { logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err) } }() From 59b147b1c60d228f15bfa693e681a2a7129ff891 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 25 Jul 2023 15:44:05 -0700 Subject: [PATCH 6/8] remove code that used to handle creation of artifact events Signed-off-by: Yee Hing Tong --- pkg/manager/impl/execution_manager.go | 126 --------------------- pkg/manager/impl/task_execution_manager.go | 68 ----------- 2 files changed, 194 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 0f5da69fb..429cbe188 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -1219,128 +1219,6 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime( watch.Observe(*executionModel.ExecutionCreatedAt, terminalEventTime) } -//// getAliases creates a list of aliases for a given output in a workflow execution. It should be called once per -//// output for a given workflow execution. -//func (m *ExecutionManager) getAliases(workflowID core.Identifier, execID core.WorkflowExecutionIdentifier, typedInterface core.TypedInterface, outputName string) ([]*artifact.Alias, error) { -// -// if v, ok := typedInterface.Outputs.Variables[outputName]; ok { -// defaultAlias := &artifact.Alias{ -// Name: fmt.Sprintf("%s/%s", workflowID.Name, outputName), -// Value: execID.Name, -// } -// -// if v.Artifact != nil && len(v.Artifact.Spec.Aliases) > 0 { -// aliases := make([]*artifact.Alias, 0, len(v.Artifact.Spec.Aliases)+1) -// aliases = append(aliases, defaultAlias) -// for _, a := range v.Artifact.Spec.Aliases { -// aliases = append(aliases, &artifact.Alias{ -// Name: a.Name, -// Value: a.Value, -// }) -// } -// return aliases, nil -// } -// -// // If nothing specified by the user, just return the default alias. -// return []*artifact.Alias{defaultAlias}, nil -// } -// return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "output [%s] not found in workflow interface [%v] for workflow [%v]", outputName, typedInterface, workflowID) -//} -// -//func (m *ExecutionManager) handleArtifactEventEmitting(ctx context.Context, request admin.WorkflowExecutionEventRequest) { -// // Basic error checking -// if request.Event.ExecutionId == nil { -// logger.Warningf(ctx, "nil execution id in event request [%+v]", request) -// return -// } -// -// // TODO: Make this one call to the DB instead of two. -// executionModel, err := m.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{ -// Project: request.Event.ExecutionId.Project, -// Domain: request.Event.ExecutionId.Domain, -// Name: request.Event.ExecutionId.Name, -// }) -// ex, err := transformers.FromExecutionModel(ctx, executionModel, nil) -// if ex.Closure.WorkflowId == nil { -// logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex) -// return -// } -// workflowModel, err := m.db.WorkflowRepo().Get(ctx, repositoryInterfaces.Identifier{ -// Project: ex.Closure.WorkflowId.Project, -// Domain: ex.Closure.WorkflowId.Domain, -// Name: ex.Closure.WorkflowId.Name, -// Version: ex.Closure.WorkflowId.Version, -// }) -// var workflowInterface core.TypedInterface -// if workflowModel.TypedInterface != nil && len(workflowModel.TypedInterface) > 0 { -// err = proto.Unmarshal(workflowModel.TypedInterface, &workflowInterface) -// if err != nil { -// logger.Errorf(ctx, -// "Artifact eventing - failed to unmarshal TypedInterface for workflow [%+v] with err: %v", -// workflowModel.ID, err) -// return -// } -// } -// -// var outputs *core.LiteralMap -// if request.Event.GetOutputData() != nil { -// fmt.Printf("remove this - Got output data") -// outputs = request.Event.GetOutputData() -// } else if len(request.Event.GetOutputUri()) > 0 { -// fmt.Printf("remove this - Got output URI") -// // GetInputs actually fetches the data, even though this is an output -// outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), -// m.storageClient, request.Event.GetOutputUri()) -// if err != nil { -// // TODO: metric this -// logger.Warningf(ctx, "Error fetching output literal map %v", request.Event) -// } -// } else { -// logger.Debugf(ctx, "Neither output data nor uri found for %v", request.Event) -// return -// } -// if outputs == nil { -// logger.Debugf(ctx, "Output data was nil for %v", request.Event) -// return -// } -// -// nodeExecutionID := core.NodeExecutionIdentifier{ -// NodeId: "end-node", -// ExecutionId: request.Event.ExecutionId, -// } -// -// for k, v := range outputs.Literals { -// // Use input type because workflow outputs are inputs to the end node. -// artifactKeySuffix := common.FlyteURLKeyFromNodeExecutionIDAndOutput(nodeExecutionID, common.ArtifactTypeI, k) -// -// aliases, err := m.getAliases(*ex.Closure.WorkflowId, *request.Event.ExecutionId, workflowInterface, k) -// if err != nil { -// logger.Errorf(ctx, "Failed getting alias for [%s] in workflow [%v], err: %v", k, ex.Closure.WorkflowId, err) -// } -// as := artifact.ArtifactSpec{ -// Value: v, -// Source: &artifact.ArtifactSpec_Execution{ -// Execution: request.Event.ExecutionId, -// }, -// Aliases: aliases, -// } -// ak := core.ArtifactKey{ -// Project: request.Event.ExecutionId.Project, -// Domain: request.Event.ExecutionId.Domain, -// Suffix: artifactKeySuffix, -// } -// -// a := artifact.CreateArtifactRequest{ -// ArtifactKey: &ak, -// Spec: &as, -// } -// e := event.ArtifactCreateEvent{ -// CreateRequest: &a, -// } -// print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) -// } -//} - func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admin.WorkflowExecutionEventRequest) ( *admin.WorkflowExecutionEventResponse, error) { err := validation.ValidateCreateWorkflowEventRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes) @@ -1426,10 +1304,6 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi if request.Event.GetOutputData() != nil { m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData()))) } - //go func() { - // logger.Debugf(ctx, "Emitting workflow success artifact event flow for [%+v]", request) - // m.handleArtifactEventEmitting(ctx, request) - //}() err = m.publishNotifications(ctx, request, *executionModel) if err != nil { diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index 9378006a9..8a371e8cf 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -129,70 +129,6 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState( return *existingTaskExecution, nil } -//func (m *TaskExecutionManager) handleArtifactEventEmitting(ctx context.Context, request admin.TaskExecutionEventRequest, taskExecutionID core.TaskExecutionIdentifier) { -// -// taskModel, err := m.db.TaskRepo().Get(ctx, repoInterfaces.Identifier{ -// Project: request.Event.TaskId.Project, -// Domain: request.Event.TaskId.Domain, -// Name: request.Event.TaskId.Name, -// Version: request.Event.TaskId.Version, -// }) -// if err != nil { -// // TODO: metric this -// logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", request.Event.TaskId, err) -// return -// } -// task, err := transformers.FromTaskModel(taskModel) -// task.Closure.CompiledTask.Template.Interface -// -// var outputs *core.LiteralMap -// if request.Event.GetOutputData() != nil { -// fmt.Printf("remove this - Got output data") -// outputs = request.Event.GetOutputData() -// } else if len(request.Event.GetOutputUri()) > 0 { -// fmt.Printf("remove this - Got output URI") -// // GetInputs actually fetches the data, even though this is an output -// outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), -// m.storageClient, request.Event.GetOutputUri()) -// if err != nil { -// fmt.Printf("Error fetching output literal map %v", request.Event) -// } -// } else { -// fmt.Printf("No output data found for %v\n", request.Event) -// } -// if outputs != nil { -// urls := common.FlyteURLsFromTaskExecutionID(taskExecutionID, false) -// outputURLs := common.AppendLinksForLiteralMap(urls.GetOutputs(), *outputs) -// for k, v := range outputs.Literals { -// as := artifact.ArtifactSpec{ -// Value: v, -// // Type, tags and aliases will need to be filled in later -// Source: &artifact.ArtifactSpec_TaskExecution{ -// TaskExecution: &taskExecutionID, -// }, -// } -// ak := core.ArtifactKey{ -// Project: request.Event.ParentNodeExecutionId.ExecutionId.Project, -// Domain: request.Event.ParentNodeExecutionId.ExecutionId.Domain, -// // This will need to be filled in later, will need to pull from task template, or set to -// // something pretty unique, like the task ID. -// Name: "", -// } -// -// a := artifact.CreateArtifactRequest{ -// ArtifactKey: &ak, -// Version: request.Event.ParentNodeExecutionId.ExecutionId.Name, -// Uri: outputURLs[k], -// Spec: &as, -// } -// e := event.ArtifactCreateEvent{ -// CreateRequest: &a, -// } -// print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e)) -// } -// } -//} - func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) ( *admin.TaskExecutionEventResponse, error) { if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil { @@ -266,10 +202,6 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req if request.Event.GetOutputData() != nil { m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData()))) } - //go func() { - // logger.Debugf(ctx, "Emitting task execution artifacts for [%+v] [%+v]", taskExecutionID, request) - // m.handleArtifactEventEmitting(ctx, request, taskExecutionID) - //}() } if err = m.notificationClient.Publish(ctx, proto.MessageName(&request), &request); err != nil { From 5a63652075ee121ce8b6afd789e3194f9e4cc4be Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 1 Aug 2023 11:20:07 -0700 Subject: [PATCH 7/8] parameter and literal map construction wrong Signed-off-by: Yee Hing Tong --- flyteadmin_config.yaml | 4 + pkg/artifacts/artifact_client.go | 37 ++++ pkg/artifacts/config.go | 7 + pkg/manager/impl/execution_manager.go | 183 ++++++++++++++---- .../impl/validation/execution_validator.go | 22 +-- pkg/rpc/adminservice/base.go | 12 +- pkg/runtime/application_config_provider.go | 8 + .../interfaces/application_configuration.go | 2 + 8 files changed, 217 insertions(+), 58 deletions(-) create mode 100644 pkg/artifacts/artifact_client.go create mode 100644 pkg/artifacts/config.go diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index ab56c8895..b42b742d6 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -60,6 +60,10 @@ flyteadmin: - "metadata" - "admin" useOffloadedWorkflowClosure: false +artifacts: + host: localhost + port: 50051 + insecure: true database: postgres: port: 30001 diff --git a/pkg/artifacts/artifact_client.go b/pkg/artifacts/artifact_client.go new file mode 100644 index 000000000..b29ebc141 --- /dev/null +++ b/pkg/artifacts/artifact_client.go @@ -0,0 +1,37 @@ +package artifacts + +import ( + "context" + "crypto/tls" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flytestdlib/logger" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + if opts == nil { + // Initialize opts list to the potential number of options we will add. Initialization optimizes memory + // allocation. + opts = make([]grpc.DialOption, 0, 5) + } + + if cfg.Insecure { + opts = append(opts, grpc.WithInsecure()) + } else { + tlsConfig := &tls.Config{} //nolint + creds := credentials.NewTLS(tlsConfig) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } + + return grpc.Dial(cfg.Host, opts...) +} + +func InitializeArtifactClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) artifact.ArtifactRegistryClient { + conn, err := NewArtifactConnection(ctx, cfg, opts...) + if err != nil { + logger.Panicf(ctx, "failed to initialize Artifact connection. Err: %s", err.Error()) + panic(err) + } + return artifact.NewArtifactRegistryClient(conn) +} diff --git a/pkg/artifacts/config.go b/pkg/artifacts/config.go new file mode 100644 index 000000000..addeedaf6 --- /dev/null +++ b/pkg/artifacts/config.go @@ -0,0 +1,7 @@ +package artifacts + +type Config struct { + Host string `json:"host"` + Port int `json:"port"` + Insecure bool `json:"insecure"` +} diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 429cbe188..5fc3f5db1 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -43,6 +43,7 @@ import ( runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" workflowengineInterfaces "github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "google.golang.org/grpc/codes" @@ -99,6 +100,7 @@ type ExecutionManager struct { cloudEventPublisher notificationInterfaces.Publisher dbEventWriter eventWriter.WorkflowExecutionEventWriter pluginRegistry *plugins.Registry + artifactClient *artifact.ArtifactRegistryClient } func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context { @@ -450,7 +452,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad func (m *ExecutionManager) launchSingleTaskExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, map[string]*core.ArtifactID, error) { + context.Context, *models.Execution, error) { taskModel, err := m.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{ Project: request.Spec.LaunchPlan.Project, @@ -459,11 +461,11 @@ func (m *ExecutionManager) launchSingleTaskExecution( Version: request.Spec.LaunchPlan.Version, }) if err != nil { - return nil, nil, nil, err + return nil, nil, err } task, err := transformers.FromTaskModel(taskModel) if err != nil { - return nil, nil, nil, err + return nil, nil, err } // Prepare a skeleton workflow @@ -472,15 +474,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task) if err != nil { logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err) - return nil, nil, nil, err + return nil, nil, err } workflow, err := transformers.FromWorkflowModel(*workflowModel) if err != nil { - return nil, nil, nil, err + return nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { - return nil, nil, nil, err + return nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -488,10 +490,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier, workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - executionInputs, resolvedArtifactMap, err := validation.CheckAndFetchInputsForExecution( + executionInputs, err := validation.CheckAndFetchInputsForExecution( request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, @@ -500,7 +502,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, nil, err + return nil, nil, err } name := util.GetExecutionName(request) @@ -518,14 +520,14 @@ func (m *ExecutionManager) launchSingleTaskExecution( requestSpec.Metadata = &admin.ExecutionMetadata{} } requestSpec.Metadata.Principal = getUser(ctx) - requestSpec.Metadata.ArtifactIds = resolvedArtifactMap + //requestSpec.Metadata.ArtifactIds = resolvedArtifactMap // Get the node execution (if any) that launched this execution var parentNodeExecutionID uint var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, nil, err + return nil, nil, err } // Dynamically assign task resource defaults. @@ -539,15 +541,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, nil) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var labels map[string]string @@ -557,7 +559,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var annotations map[string]string @@ -572,7 +574,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -590,7 +592,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "") if err != nil { - return nil, nil, nil, err + return nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -615,7 +617,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, request.Inputs, err) - return nil, nil, nil, err + return nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -656,10 +658,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, nil, err + return nil, nil, err } m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(proto.Size(request.Inputs))) - return ctx, executionModel, resolvedArtifactMap, nil + return ctx, executionModel, nil } func resolveAuthRole(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole { @@ -707,14 +709,78 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se } } +// ResolveLiteralMapArtifacts will go through the input literal map and resolve any artifact references to their +// literal values. +func (m *ExecutionManager) ResolveLiteralMapArtifacts(ctx context.Context, inputs *core.LiteralMap) (*core.LiteralMap, []*core.ArtifactID, error) { + // only top level replace for now. + var artifactIDs []*core.ArtifactID + outputs := map[string]*core.Literal{} + for k, v := range inputs.Literals { + if v.GetArtifactId() != nil { + if m.artifactClient == nil { + return nil, nil, errors.NewFlyteAdminErrorf(codes.Internal, "artifact client is not initialized") + } + + x := *m.artifactClient + req := &artifact.GetArtifactRequest{ + Identifier: &artifact.GetArtifactRequest_ArtifactId{ArtifactId: v.GetArtifactId()}, + Details: false, + } + resp, err := x.GetArtifact(ctx, req) + if err != nil { + return nil, nil, err + } + artifactIDs = append(artifactIDs, resp.Artifact.GetArtifactId()) + logger.Debugf(ctx, "Resolved artifact for [%s] to [%+v]", k, resp.GetArtifact().ArtifactId) + outputs[k] = resp.Artifact.Spec.Value + } else { + outputs[k] = v + } + } + lm := &core.LiteralMap{Literals: outputs} + return lm, artifactIDs, nil + +} + +// ResolveParameterMapArtifacts will go through the parameter map, and resolve any artifact queries. +func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inputs *core.ParameterMap) (*core.ParameterMap, []*core.ArtifactID, error) { + // only top level replace for now. + var artifactIDs []*core.ArtifactID + outputs := map[string]*core.Parameter{} + x := *m.artifactClient + + for k, v := range inputs.Parameters { + if v.GetArtifactQuery() != nil { + if m.artifactClient == nil { + return nil, nil, errors.NewFlyteAdminErrorf(codes.Internal, "artifact client is not initialized, can't resolve queries") + } + + req := &artifact.GetArtifactRequest{ + Identifier: &artifact.GetArtifactRequest_Query{Query: v.GetArtifactQuery()}, + Details: false, + } + + resp, err := x.GetArtifact(ctx, req) + if err != nil { + return nil, nil, err + } + artifactIDs = append(artifactIDs, resp.Artifact.GetArtifactId()) + logger.Debugf(ctx, "Resolved query for [%s] to [%+v]", k, resp.Artifact.ArtifactId) + outputs[k] = &core.Parameter{ + Var: v.Var, + Behavior: &core.Parameter_Default{Default: resp.Artifact.Spec.Value}, + } + } else { + outputs[k] = v + } + } + pm := &core.ParameterMap{Parameters: outputs} + return pm, artifactIDs, nil +} + func (m *ExecutionManager) launchExecutionAndPrepareModel( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, map[string]*core.ArtifactID, error) { - - // Resolve artifacts. - // two sources of artifacts: launch plan and create execute request. - // - within the launch plan, the artifact will be in the Parameter map, and can come in the Literal, - // or as an ArtifactQuery. + context.Context, *models.Execution, []*core.ArtifactID, error) { err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration()) if err != nil { @@ -723,7 +789,10 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( } if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK { logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan) - return m.launchSingleTaskExecution(ctx, request, requestedAt) + // When tasks can have defaults this will need to handle Artifacts as well. + ctx, model, err := m.launchSingleTaskExecution(ctx, request, requestedAt) + // Since single task doesn't leverage artifact yet, inject a nil artifact list. + return ctx, model, nil, err } launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan) @@ -736,18 +805,51 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err) return nil, nil, nil, err } + + // Resolve artifacts. + // two sources of artifacts: launch plan and create execute request. + // - within the create request, artifacts will be pinned in the Literals. + // - within the launch plan, the artifact will be in the Parameter map, and can come in the Literal, + // or as an ArtifactQuery. + // All three components that comprise the inputs may contain artifacts. + var foundArtifacts []*core.ArtifactID + resolvedRequestInputs, as, err := m.ResolveLiteralMapArtifacts(ctx, request.Inputs) + if err != nil { + logger.Errorf(ctx, "Error looking up request.Inputs for artifacts: %v", err) + return nil, nil, nil, err + } + foundArtifacts = append(foundArtifacts, as...) + resolvedFixedInputs, as, err := m.ResolveLiteralMapArtifacts(ctx, launchPlan.Spec.FixedInputs) + if err != nil { + logger.Errorf(ctx, "Error looking up launch plan fixed inputs for artifacts: %v", err) + return nil, nil, nil, err + } + foundArtifacts = append(foundArtifacts, as...) + resolvedExpectedInputs, as, err := m.ResolveParameterMapArtifacts(ctx, launchPlan.Closure.ExpectedInputs) + if err != nil { + logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err) + return nil, nil, nil, err + } + foundArtifacts = append(foundArtifacts, as...) + if len(foundArtifacts) > 0 { + logger.Debugf(ctx, "Resolved request.Inputs from [%+v] to [%+v]", request.Inputs, resolvedRequestInputs) + logger.Debugf(ctx, "Resolved launch plan fixed inputs from [%+v] to [%+v]", launchPlan.Spec.FixedInputs, resolvedFixedInputs) + logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, resolvedExpectedInputs) + logger.Debugf(ctx, "Found artifacts: %v", foundArtifacts) + } + // Artifacts retrieved will need to be stored somewhere to ensure that we can re-emit events if necessary // in the future, and also to make sure that relaunch and recover can use it if necessary. - executionInputs, resolvedArtifactMap, err := validation.CheckAndFetchInputsForExecution( - request.Inputs, - launchPlan.Spec.FixedInputs, - launchPlan.Closure.ExpectedInputs, + executionInputs, err := validation.CheckAndFetchInputsForExecution( + resolvedRequestInputs, + resolvedFixedInputs, + resolvedExpectedInputs, ) if err != nil { - logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ + logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with resolvedRequestInputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", - request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) + resolvedRequestInputs, resolvedFixedInputs, resolvedExpectedInputs, err) return nil, nil, nil, err } @@ -781,7 +883,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( requestSpec.Metadata = &admin.ExecutionMetadata{} } requestSpec.Metadata.Principal = getUser(ctx) - requestSpec.Metadata.ArtifactIds = resolvedArtifactMap + requestSpec.Metadata.ArtifactIds = foundArtifacts // Get the node and parent execution (if any) that launched this execution var parentNodeExecutionID uint @@ -804,7 +906,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { return nil, nil, nil, err } - userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) + userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, resolvedRequestInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { return nil, nil, nil, err } @@ -923,7 +1025,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( workflowExecutionID, err) return nil, nil, nil, err } - return ctx, executionModel, resolvedArtifactMap, nil + return ctx, executionModel, foundArtifacts, nil } // Inserts an execution model into the database store and emits platform metrics. @@ -947,7 +1049,8 @@ func (m *ExecutionManager) createExecutionModel( return &workflowExecutionIdentifier, nil } -func (m *ExecutionManager) handleArtifactEvents(artifactIDs map[string]*core.ArtifactID, wfExecID core.WorkflowExecutionIdentifier) { +func (m *ExecutionManager) handleArtifactEvents(artifactIDs []*core.ArtifactID, wfExecID core.WorkflowExecutionIdentifier) { + // todo: Proper events need to be created for this. if artifactIDs != nil { fmt.Printf("WF exec used %v", wfExecID.String()) for _, artifactID := range artifactIDs { @@ -967,7 +1070,7 @@ func (m *ExecutionManager) CreateExecution( } var executionModel *models.Execution var err error - var artifactIDs map[string]*core.ArtifactID + var artifactIDs []*core.ArtifactID ctx, executionModel, artifactIDs, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) if err != nil { return nil, err @@ -1659,7 +1762,8 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher, - eventWriter eventWriter.WorkflowExecutionEventWriter) interfaces.ExecutionInterface { + eventWriter eventWriter.WorkflowExecutionEventWriter, artifactClient *artifact.ArtifactRegistryClient) interfaces.ExecutionInterface { + queueAllocator := executions.NewQueueAllocator(config, db) systemMetrics := newExecutionSystemMetrics(systemScope) @@ -1692,6 +1796,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu cloudEventPublisher: cloudEventPublisher, dbEventWriter: eventWriter, pluginRegistry: pluginRegistry, + artifactClient: artifactClient, } } diff --git a/pkg/manager/impl/validation/execution_validator.go b/pkg/manager/impl/validation/execution_validator.go index 6920f44e4..a1c2a1514 100644 --- a/pkg/manager/impl/validation/execution_validator.go +++ b/pkg/manager/impl/validation/execution_validator.go @@ -79,11 +79,10 @@ func ValidateExecutionRequest(ctx context.Context, request admin.ExecutionCreate // CheckAndFetchInputsForExecution will merge inputs and also resolve any artifacts that are required. // A map will be returned for all artifacts used. func CheckAndFetchInputsForExecution( - userInputs *core.LiteralMap, fixedInputs *core.LiteralMap, expectedInputs *core.ParameterMap) (*core.LiteralMap, map[string]*core.ArtifactID, error) { + userInputs *core.LiteralMap, fixedInputs *core.LiteralMap, expectedInputs *core.ParameterMap) (*core.LiteralMap, error) { executionInputMap := map[string]*core.Literal{} expectedInputMap := map[string]*core.Parameter{} - resolvedArtifactMap := map[string]*core.ArtifactID{} if expectedInputs != nil && len(expectedInputs.GetParameters()) > 0 { expectedInputMap = expectedInputs.GetParameters() @@ -92,7 +91,7 @@ func CheckAndFetchInputsForExecution( if userInputs != nil && len(userInputs.GetLiterals()) > 0 { for name, value := range userInputs.GetLiterals() { if _, ok := expectedInputMap[name]; !ok { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input %s", name) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input %s", name) } executionInputMap[name] = value } @@ -101,7 +100,7 @@ func CheckAndFetchInputsForExecution( for name, expectedInput := range expectedInputMap { if _, ok := executionInputMap[name]; !ok { if expectedInput.GetRequired() { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s missing", shared.ExpectedInputs, name) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s missing", shared.ExpectedInputs, name) } // Look up from Artifact service if necessary if expectedInput.GetArtifactQuery() != nil { @@ -114,7 +113,7 @@ func CheckAndFetchInputsForExecution( } else { inputType := validators.LiteralTypeForLiteral(executionInputMap[name]) if !validators.AreTypesCastable(inputType, expectedInput.GetVar().GetType()) { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid %s input wrong type. Expected %s, but got %s", name, expectedInput.GetVar().GetType(), inputType) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid %s input wrong type. Expected %s, but got %s", name, expectedInput.GetVar().GetType(), inputType) } } } @@ -122,24 +121,15 @@ func CheckAndFetchInputsForExecution( if fixedInputs != nil && len(fixedInputs.GetLiterals()) > 0 { for name, fixedInput := range fixedInputs.GetLiterals() { if _, ok := executionInputMap[name]; ok { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s cannot be overridden", shared.FixedInputs, name) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s cannot be overridden", shared.FixedInputs, name) } executionInputMap[name] = fixedInput } } - // Resolve any artifacts that are required. - for name, input := range executionInputMap { - if input.GetArtifactId() != nil { - resolvedArtifactMap[name] = input.GetArtifactId() - // Replace the reference with the actual literal - // executionInputMap[name] = artifactService.GetArtifact(input.GetArtifactId()) - } - } - return &core.LiteralMap{ Literals: executionInputMap, - }, resolvedArtifactMap, nil + }, nil } func CheckValidExecutionID(executionID, fieldName string) error { diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 4e3ee7f8a..d206e182c 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -14,10 +14,11 @@ import ( eventWriter "github.com/flyteorg/flyteadmin/pkg/async/events/implementations" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" - "github.com/flyteorg/flyteadmin/pkg/manager/impl/resources" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" + artifactClient "github.com/flyteorg/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyteadmin/pkg/async/notifications" "github.com/flyteorg/flyteadmin/pkg/async/schedule" "github.com/flyteorg/flyteadmin/pkg/data" @@ -142,9 +143,14 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi executionEventWriter.Run() }() + var artifactsClient *artifact.ArtifactRegistryClient + if configuration.ApplicationConfiguration().GetArtifactsConfig() != nil { + c := artifactClient.InitializeArtifactClient(ctx, configuration.ApplicationConfiguration().GetArtifactsConfig()) + artifactsClient = &c + } executionManager := manager.NewExecutionManager(repo, pluginRegistry, configuration, dataStorageClient, adminScope.NewSubScope("execution_manager"), adminScope.NewSubScope("user_execution_metrics"), - publisher, urlData, workflowManager, namedEntityManager, eventPublisher, cloudEventPublisher, executionEventWriter) + publisher, urlData, workflowManager, namedEntityManager, eventPublisher, cloudEventPublisher, executionEventWriter, artifactsClient) versionManager := manager.NewVersionManager() scheduledWorkflowExecutor := workflowScheduler.GetWorkflowExecutor(executionManager, launchPlanManager) diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go index 3b8b0a270..b708ba837 100644 --- a/pkg/runtime/application_config_provider.go +++ b/pkg/runtime/application_config_provider.go @@ -1,6 +1,7 @@ package runtime import ( + artifactsClient "github.com/flyteorg/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyteadmin/pkg/common" "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flytestdlib/config" @@ -15,6 +16,7 @@ const notifications = "notifications" const domains = "domains" const externalEvents = "externalEvents" const cloudEvents = "cloudEvents" +const artifact = "artifacts" const metricPort = 10254 const KB = 1024 @@ -84,6 +86,8 @@ var cloudEventsConfig = config.MustRegisterSection(cloudEvents, &interfaces.Clou Type: common.Local, }) +var artifactsConfig = config.MustRegisterSection(artifact, &artifactsClient.Config{}) + // Implementation of an interfaces.ApplicationConfiguration type ApplicationConfigurationProvider struct{} @@ -119,6 +123,10 @@ func (p *ApplicationConfigurationProvider) GetCloudEventsConfig() *interfaces.Cl return cloudEventsConfig.GetConfig().(*interfaces.CloudEventsConfig) } +func (p *ApplicationConfigurationProvider) GetArtifactsConfig() *artifactsClient.Config { + return artifactsConfig.GetConfig().(*artifactsClient.Config) +} + func NewApplicationConfigurationProvider() interfaces.ApplicationConfiguration { return &ApplicationConfigurationProvider{} } diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index aeb0ab34e..8b59d1404 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -1,6 +1,7 @@ package interfaces import ( + artifactsClient "github.com/flyteorg/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/config" @@ -592,4 +593,5 @@ type ApplicationConfiguration interface { GetDomainsConfig() *DomainsConfig GetExternalEventsConfig() *ExternalEventsConfig GetCloudEventsConfig() *CloudEventsConfig + GetArtifactsConfig() *artifactsClient.Config } From ae78b900d753b2fcc7d9975678adee0e06e073eb Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 1 Aug 2023 14:00:55 -0700 Subject: [PATCH 8/8] add port, add nil checks Signed-off-by: Yee Hing Tong --- pkg/artifacts/artifact_client.go | 3 ++- pkg/manager/impl/execution_manager.go | 8 +++++++- pkg/manager/impl/validation/validation.go | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/artifacts/artifact_client.go b/pkg/artifacts/artifact_client.go index b29ebc141..48cb30591 100644 --- a/pkg/artifacts/artifact_client.go +++ b/pkg/artifacts/artifact_client.go @@ -3,6 +3,7 @@ package artifacts import ( "context" "crypto/tls" + "fmt" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flytestdlib/logger" "google.golang.org/grpc" @@ -24,7 +25,7 @@ func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOpti opts = append(opts, grpc.WithTransportCredentials(creds)) } - return grpc.Dial(cfg.Host, opts...) + return grpc.Dial(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), opts...) } func InitializeArtifactClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) artifact.ArtifactRegistryClient { diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 5fc3f5db1..757b83c90 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -712,8 +712,11 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se // ResolveLiteralMapArtifacts will go through the input literal map and resolve any artifact references to their // literal values. func (m *ExecutionManager) ResolveLiteralMapArtifacts(ctx context.Context, inputs *core.LiteralMap) (*core.LiteralMap, []*core.ArtifactID, error) { - // only top level replace for now. var artifactIDs []*core.ArtifactID + if inputs == nil { + return nil, artifactIDs, nil + } + // only top level replace for now. outputs := map[string]*core.Literal{} for k, v := range inputs.Literals { if v.GetArtifactId() != nil { @@ -746,6 +749,9 @@ func (m *ExecutionManager) ResolveLiteralMapArtifacts(ctx context.Context, input func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inputs *core.ParameterMap) (*core.ParameterMap, []*core.ArtifactID, error) { // only top level replace for now. var artifactIDs []*core.ArtifactID + if inputs == nil { + return nil, artifactIDs, nil + } outputs := map[string]*core.Parameter{} x := *m.artifactClient diff --git a/pkg/manager/impl/validation/validation.go b/pkg/manager/impl/validation/validation.go index e6c7cfae2..6df304425 100644 --- a/pkg/manager/impl/validation/validation.go +++ b/pkg/manager/impl/validation/validation.go @@ -256,7 +256,7 @@ func validateParameterMap(inputMap *core.ParameterMap, fieldName string) error { "The Variable component of the Parameter %s in %s either is missing, or has a missing Type", name, fieldName) } - if defaultInput.GetDefault() == nil && !defaultInput.GetRequired() { + if defaultInput.GetDefault() == nil && defaultInput.GetArtifactQuery() == nil && !defaultInput.GetRequired() { return errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Invalid variable %s in %s - variable has neither default, nor is required. "+ "One must be specified", name, fieldName)