Skip to content

Commit

Permalink
WIP: Add preprocessing child workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jraddaoui committed Mar 18, 2024
1 parent b63bb1a commit 6132a18
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 15 deletions.
28 changes: 27 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,34 @@ KUBE_OVERLAY = 'hack/kube/overlays/dev-a3m'
if PRES_SYS == 'am':
KUBE_OVERLAY = 'hack/kube/overlays/dev-am'

# Load Kustomize YAML
yaml = kustomize(KUBE_OVERLAY)

# Preprocessing
PREP_PATH = os.environ.get("PREPROCESSING_PATH", "")
if PREP_PATH != "":
# Load preprocessing Tiltfile for Enduro
load_dynamic(PREP_PATH + "/Tiltfile.enduro")
# Deploying with shared filesystem
PREP_SHARE = os.environ.get("PREPROCESSING_SHARING_FS", "").lower() in true
if PREP_SHARE:
# Get Enduro a3m/am worker k8s manifest
if PRES_SYS == "a3m":
pres_yaml, yaml = filter_yaml(yaml, name="^enduro-a3m$", kind="StatefulSet")
else:
pres_yaml, yaml = filter_yaml(yaml, name="^enduro-am$", kind="Deployment")
# Append preprocessing volume and volume mount to worker container,
# this will only work in single node k8s cluster deployments
volume = {"name": "shared-dir", "persistentVolumeClaim": {"claimName": "preprocessing-pvc"}}
volume_mount = {"name": "shared-dir", "mountPath": "/tmp"}
pres_obj = decode_yaml(pres_yaml)
pres_obj["spec"]["template"]["spec"]["volumes"].append(volume)
pres_obj["spec"]["template"]["spec"]["containers"][0]["volumeMounts"].append(volume_mount)
pres_yaml = encode_yaml(pres_obj)
yaml = [yaml, pres_yaml]

# Load Kubernetes resources
k8s_yaml(kustomize(KUBE_OVERLAY))
k8s_yaml(yaml)

# Configure trigger mode
trigger_mode = TRIGGER_MODE_MANUAL
Expand Down
10 changes: 10 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,13 @@ bucket = "sips"
enabled = false
address = ""
samplingRatio = 1.0

[preprocessing]
enabled = true
extract = true
sharedPath = "/tmp"

[preprocessing.temporal]
namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"
24 changes: 13 additions & 11 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/api"
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/event"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/pres"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/telemetry"
Expand All @@ -30,17 +31,18 @@ type Configuration struct {
Debug bool
DebugListen string

A3m a3m.Config
AM am.Config
API api.Config
Database db.Config
Event event.Config
Preservation pres.Config
Storage storage.Config
Temporal temporal.Config
Upload upload.Config
Watcher watcher.Config
Telemetry telemetry.Config
A3m a3m.Config
AM am.Config
API api.Config
Database db.Config
Event event.Config
Preservation pres.Config
Storage storage.Config
Temporal temporal.Config
Upload upload.Config
Watcher watcher.Config
Telemetry telemetry.Config
Preprocessing preprocessing.Config
}

func (c Configuration) Validate() error {
Expand Down
4 changes: 2 additions & 2 deletions internal/package_/preservation_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (svc *packageImpl) readPreservationAction(ctx context.Context, ID uint) (*g
WorkflowID: dbItem.WorkflowID,
Type: dbItem.Type.String(),
Status: dbItem.Status.String(),
StartedAt: ref.Deref(formatOptionalTime(dbItem.StartedAt)),
StartedAt: ref.DerefZero(formatOptionalTime(dbItem.StartedAt)),
CompletedAt: formatOptionalTime(dbItem.CompletedAt),
PackageID: ref.New(dbItem.PackageID),
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func (svc *packageImpl) readPreservationTask(ctx context.Context, ID uint) (*goa
TaskID: dbItem.TaskID,
Name: dbItem.Name,
Status: dbItem.Status.String(),
StartedAt: ref.Deref(formatOptionalTime(dbItem.StartedAt)),
StartedAt: ref.DerefZero(formatOptionalTime(dbItem.StartedAt)),
CompletedAt: formatOptionalTime(dbItem.CompletedAt),
Note: ref.New(dbItem.Note),
PreservationActionID: ref.New(dbItem.PreservationActionID),
Expand Down
28 changes: 28 additions & 0 deletions internal/preprocessing/preprocessing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package preprocessing

type Config struct {
// Enable preprocessing child workflow.
Enabled bool
// Extract package in preprocessing.
Extract bool
// Local path shared with Enduro.
SharedPath string
// Temporal configuration.
Temporal Temporal
}

type Temporal struct {
Namespace string
TaskQueue string
WorkflowName string
}

type WorkflowParams struct {
// Relative path to the shared path.
RelativePath string
}

type WorkflowResult struct {
// Relative path to the shared path.
RelativePath string
}
47 changes: 46 additions & 1 deletion internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ package workflow
import (
"errors"
"fmt"
"path/filepath"
"time"

"github.com/go-logr/logr"
"github.com/google/uuid"
"go.artefactual.dev/tools/ref"
temporalapi_enums "go.temporal.io/api/enums/v1"
temporalsdk_temporal "go.temporal.io/sdk/temporal"
temporalsdk_workflow "go.temporal.io/sdk/workflow"

Expand All @@ -21,6 +23,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/config"
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/watcher"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
Expand Down Expand Up @@ -309,7 +312,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
}

// Unarchive the transfer if it's not a directory.
if !tinfo.req.IsDir {
if !tinfo.req.IsDir && !w.cfg.Preprocessing.Extract {
activityOpts := withActivityOptsForLocalAction(sessCtx)
var result activities.UnarchiveActivityResult
err := temporalsdk_workflow.ExecuteActivity(
Expand All @@ -327,6 +330,11 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
tinfo.req.IsDir = result.IsDir
}

// Preprocessing child workflow.
if err := w.preprocessing(sessCtx, tinfo); err != nil {
return err
}

// Bundle.
{
// For the a3m workflow bundle the transfer to a directory shared with
Expand Down Expand Up @@ -775,3 +783,40 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti

return nil
}

// TODO:
// - Allow using a different Temporal instance?
// - Make retry policy and timeouts configurable?
// - Enable remote options.
// - Move transfer if tinfo.TempPath is not inside w.prepConfig.SharedPath.
func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tinfo *TransferInfo) error {
if !w.cfg.Preprocessing.Enabled {
return nil
}

realPath, err := filepath.Rel(w.cfg.Preprocessing.SharedPath, tinfo.TempPath)
if err != nil {
return err
}

preCtx := temporalsdk_workflow.WithChildOptions(ctx, temporalsdk_workflow.ChildWorkflowOptions{
Namespace: w.cfg.Preprocessing.Temporal.Namespace,
TaskQueue: w.cfg.Preprocessing.Temporal.TaskQueue,
WorkflowID: fmt.Sprintf("%s-%s", w.cfg.Preprocessing.Temporal.WorkflowName, uuid.New().String()),
ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_TERMINATE,
})
var result preprocessing.WorkflowResult
err = temporalsdk_workflow.ExecuteChildWorkflow(
preCtx,
w.cfg.Preprocessing.Temporal.WorkflowName,
preprocessing.WorkflowParams{RelativePath: realPath},
).Get(preCtx, &result)
if err != nil {
return err
}

tinfo.TempPath = filepath.Join(w.cfg.Preprocessing.SharedPath, filepath.Clean(result.RelativePath))
tinfo.req.IsDir = true

return nil
}

0 comments on commit 6132a18

Please sign in to comment.