forked from arcalot/arcaflow-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.go
138 lines (119 loc) · 3.71 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Package engine provides an embeddable engine variant.
package engine
import (
"context"
"fmt"
log "go.arcalot.io/log/v2"
"go.flow.arcalot.io/engine/internal/step"
"go.flow.arcalot.io/engine/workflow"
"go.flow.arcalot.io/pluginsdk/schema"
"go.flow.arcalot.io/deployer/registry"
"go.flow.arcalot.io/engine/config"
"go.flow.arcalot.io/engine/internal/yaml"
)
// WorkflowEngine is responsible for executing workflows and returning their result.
type WorkflowEngine interface {
// RunWorkflow is a simplified shortcut to parse and immediately run a workflow.
RunWorkflow(
ctx context.Context,
input []byte,
workflowContext map[string][]byte,
workflowFileName string,
) (outputID string, outputData any, outputError bool, err error)
// Parse ingests a workflow context as a map of files to their contents and a workflow file name and
// parses the data into an executable workflow.
Parse(
workflowContext map[string][]byte,
workflowFileName string,
) (
workflow Workflow,
err error,
)
}
// Workflow is a runnable, queryable workflow. You can execute it, or query it for schema information.
type Workflow interface {
// Run executes the workflow with the passed, YAML-formatted input data.
Run(
ctx context.Context,
input []byte,
) (
outputID string,
outputData any,
outputIsError bool,
err error,
)
// InputSchema returns the requested input schema for the workflow.
InputSchema() schema.Scope
// Outputs returns the list of possible outputs and their schema for the workflow.
Outputs() map[string]schema.StepOutput
}
type workflowEngine struct {
logger log.Logger
deployerRegistry registry.Registry
stepRegistry step.Registry
config *config.Config
}
func (w workflowEngine) RunWorkflow(ctx context.Context, input []byte, workflowContext map[string][]byte, workflowFileName string) (outputID string, outputData any, outputError bool, err error) {
wf, err := w.Parse(workflowContext, workflowFileName)
if err != nil {
return "", nil, true, err
}
return wf.Run(ctx, input)
}
func (w workflowEngine) Parse(
files map[string][]byte,
workflowFileName string,
) (Workflow, error) {
if workflowFileName == "" {
workflowFileName = "workflow.yaml"
}
workflowContents, ok := files[workflowFileName]
if !ok {
return nil, ErrNoWorkflowFile
}
yamlConverter := workflow.NewYAMLConverter(w.stepRegistry)
wf, err := yamlConverter.FromYAML(workflowContents)
if err != nil {
return nil, err
}
executor, err := workflow.NewExecutor(w.logger, w.config, w.stepRegistry)
if err != nil {
return nil, err
}
preparedWorkflow, err := executor.Prepare(wf, files)
if err != nil {
return nil, err
}
return &engineWorkflow{
workflow: preparedWorkflow,
}, nil
}
type engineWorkflow struct {
workflow workflow.ExecutableWorkflow
}
func (e engineWorkflow) Run(ctx context.Context, input []byte) (outputID string, outputData any, outputIsError bool, err error) {
decodedInput, err := yaml.New().Parse(input)
if err != nil {
return "", nil, true, fmt.Errorf("failed to YAML decode input (%w)", err)
}
outputID, outputData, err = e.workflow.Execute(ctx, decodedInput.Raw())
if err != nil {
return "", nil, true, err
}
outputSchema, ok := e.workflow.OutputSchema()[outputID]
if !ok {
return "", nil, true, fmt.Errorf("bug: the output schema has no output named '%s'", outputID)
}
return outputID, outputData, outputSchema.Error(), nil
}
func (e engineWorkflow) InputSchema() schema.Scope {
return e.workflow.Input()
}
func (e engineWorkflow) Outputs() map[string]schema.StepOutput {
outputSchema := e.workflow.OutputSchema()
outputs := make(map[string]schema.StepOutput, len(outputSchema))
for outputID, output := range outputSchema {
outputs[outputID] = output
}
return outputs
}