Skip to content

Commit

Permalink
Move OrchestrationMetadata to proto (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshVanL authored Dec 5, 2024
1 parent 4232880 commit 327ccdd
Show file tree
Hide file tree
Showing 18 changed files with 4,158 additions and 1,838 deletions.
189 changes: 9 additions & 180 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package api
import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -49,18 +47,6 @@ type OrchestrationIdReusePolicy = protos.OrchestrationIdReusePolicy
// InstanceID is a unique identifier for an orchestration instance.
type InstanceID string

type OrchestrationMetadata struct {
InstanceID InstanceID
Name string
RuntimeStatus protos.OrchestrationStatus
CreatedAt time.Time
LastUpdatedAt time.Time
SerializedInput string
SerializedOutput string
SerializedCustomStatus string
FailureDetails *protos.TaskFailureDetails
}

// NewOrchestrationOptions configures options for starting a new orchestration.
type NewOrchestrationOptions func(*protos.CreateInstanceRequest) error

Expand Down Expand Up @@ -110,9 +96,9 @@ func WithInput(input any) NewOrchestrationOptions {
}

// WithRawInput configures an input for the orchestration. The specified input must be a string.
func WithRawInput(rawInput string) NewOrchestrationOptions {
func WithRawInput(rawInput *wrapperspb.StringValue) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
req.Input = wrapperspb.String(rawInput)
req.Input = rawInput
return nil
}
}
Expand Down Expand Up @@ -147,9 +133,9 @@ func WithEventPayload(data any) RaiseEventOptions {
}

// WithRawEventData configures an event payload that is a raw, unprocessed string (e.g. JSON data).
func WithRawEventData(data string) RaiseEventOptions {
func WithRawEventData(data *wrapperspb.StringValue) RaiseEventOptions {
return func(req *protos.RaiseEventRequest) error {
req.Input = wrapperspb.String(data)
req.Input = data
return nil
}
}
Expand All @@ -167,9 +153,9 @@ func WithOutput(data any) TerminateOptions {
}

// WithRawOutput configures a raw, unprocessed output (i.e. pre-serialized) for the terminated orchestration.
func WithRawOutput(data string) TerminateOptions {
func WithRawOutput(data *wrapperspb.StringValue) TerminateOptions {
return func(req *protos.TerminateRequest) error {
req.Output = wrapperspb.String(data)
req.Output = data
return nil
}
}
Expand All @@ -190,168 +176,11 @@ func WithRecursivePurge(recursive bool) PurgeOptions {
}
}

func NewOrchestrationMetadata(
iid InstanceID,
name string,
status protos.OrchestrationStatus,
createdAt time.Time,
lastUpdatedAt time.Time,
serializedInput string,
serializedOutput string,
serializedCustomStatus string,
failureDetails *protos.TaskFailureDetails,
) *OrchestrationMetadata {
return &OrchestrationMetadata{
InstanceID: iid,
Name: name,
RuntimeStatus: status,
CreatedAt: createdAt,
LastUpdatedAt: lastUpdatedAt,
SerializedInput: serializedInput,
SerializedOutput: serializedOutput,
SerializedCustomStatus: serializedCustomStatus,
FailureDetails: failureDetails,
}
}

func (m *OrchestrationMetadata) MarshalJSON() ([]byte, error) {
obj := make(map[string]any, 16)

// Required values
obj["id"] = m.InstanceID
obj["name"] = m.Name
obj["status"] = helpers.ToRuntimeStatusString(m.RuntimeStatus)
obj["createdAt"] = m.CreatedAt
obj["lastUpdatedAt"] = m.LastUpdatedAt

// Optional values
if m.SerializedInput != "" {
obj["serializedInput"] = m.SerializedInput
}
if m.SerializedOutput != "" {
obj["serializedOutput"] = m.SerializedOutput
}
if m.SerializedCustomStatus != "" {
obj["serializedCustomStatus"] = m.SerializedCustomStatus
}

// Optional failure details (recursive)
if m.FailureDetails != nil {
const fieldCount = 4
root := make(map[string]any, fieldCount)
current := root
f := m.FailureDetails
for {
current["type"] = f.ErrorType
current["message"] = f.ErrorMessage
if f.StackTrace != nil {
current["stackTrace"] = f.StackTrace.GetValue()
}
if f.InnerFailure == nil {
// base case
break
}
// recursive case
f = f.InnerFailure
inner := make(map[string]any, fieldCount)
current["innerFailure"] = inner
current = inner
}
obj["failureDetails"] = root
}
return json.Marshal(obj)
}

func (m *OrchestrationMetadata) UnmarshalJSON(data []byte) (err error) {
defer func() {
if r := recover(); r != nil {
if rerr, ok := r.(error); ok {
err = fmt.Errorf("failed to unmarshal the JSON payload: %w", rerr)
} else {
err = errors.New("failed to unmarshal the JSON payload")
}
}
}()

var obj map[string]any
if err := json.Unmarshal(data, &obj); err != nil {
return fmt.Errorf("failed to unmarshal orchestration metadata json: %w", err)
}

if id, ok := obj["id"]; ok {
m.InstanceID = InstanceID(id.(string))
} else {
return errors.New("missing 'id' field")
}
if name, ok := obj["name"]; ok {
m.Name = name.(string)
} else {
return errors.New("missing 'name' field")
}
if status, ok := obj["status"]; ok {
m.RuntimeStatus = helpers.FromRuntimeStatusString(status.(string))
} else {
return errors.New("missing 'name' field")
}
if createdAt, ok := obj["createdAt"]; ok {
if time, err := time.Parse(time.RFC3339, createdAt.(string)); err == nil {
m.CreatedAt = time
} else {
return errors.New("invalid 'createdAt' field: must be RFC3339 format")
}
} else {
return errors.New("missing 'createdAt' field")
}
if lastUpdatedAt, ok := obj["lastUpdatedAt"]; ok {
if time, err := time.Parse(time.RFC3339, lastUpdatedAt.(string)); err == nil {
m.LastUpdatedAt = time
} else {
return errors.New("invalid 'lastUpdatedAt' field: must be RFC3339 format")
}
} else {
return errors.New("missing 'lastUpdatedAt' field")
}
if input, ok := obj["serializedInput"]; ok {
m.SerializedInput = input.(string)
}
if output, ok := obj["serializedOutput"]; ok {
m.SerializedOutput = output.(string)
}
if output, ok := obj["serializedCustomStatus"]; ok {
m.SerializedCustomStatus = output.(string)
}

failureDetails, ok := obj["failureDetails"]
if ok {
m.FailureDetails = &protos.TaskFailureDetails{}
current := m.FailureDetails
obj = failureDetails.(map[string]any)
for {
current.ErrorType = obj["type"].(string)
current.ErrorMessage = obj["message"].(string)
if stackTrace, ok := obj["stackTrace"]; ok {
current.StackTrace = wrapperspb.String(stackTrace.(string))
}
if innerFailure, ok := obj["innerFailure"]; ok {
// recursive case
next := &protos.TaskFailureDetails{}
current.InnerFailure = next
current = next
obj = innerFailure.(map[string]any)
} else {
// base case
break
}
}
}
return nil
}

func (o *OrchestrationMetadata) IsRunning() bool {
return !o.IsComplete()
func OrchestrationMetadataIsRunning(o *protos.OrchestrationMetadata) bool {
return !OrchestrationMetadataIsComplete(o)
}

func (o *OrchestrationMetadata) IsComplete() bool {
func OrchestrationMetadataIsComplete(o *protos.OrchestrationMetadata) bool {
return o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED ||
o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED ||
o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED ||
Expand Down
Loading

0 comments on commit 327ccdd

Please sign in to comment.