Skip to content

Commit

Permalink
fix: load missing fields for archived workflows (#13136)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiacheng Xu <[email protected]>
  • Loading branch information
jiachengxu authored Jun 3, 2024
1 parent 85c8832 commit b212d5f
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/upper/db/v4"
"google.golang.org/grpc/codes"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

Expand All @@ -31,9 +32,16 @@ type archivedWorkflowMetadata struct {
Phase wfv1.WorkflowPhase `db:"phase"`
StartedAt time.Time `db:"startedat"`
FinishedAt time.Time `db:"finishedat"`
Labels string `db:"labels,omitempty"`
Annotations string `db:"annotations,omitempty"`
Progress string `db:"progress,omitempty"`

// The following fields are not stored as columns in the database, and they are stored as JSON strings in the workflow column, and will be loaded from there.
CreationTimestamp string `db:"creationtimestamp,omitempty"`
Labels string `db:"labels,omitempty"`
Annotations string `db:"annotations,omitempty"`
Suspend *bool `db:"suspend,omitempty"`
Message string `db:"message,omitempty"`
Progress string `db:"progress,omitempty"`
EstimatedDuration int `db:"estimatedduration,omitempty"`
ResourcesDuration string `db:"resourcesduration,omitempty"`
}

type archivedWorkflowRecord struct {
Expand Down Expand Up @@ -182,20 +190,36 @@ func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workfl
return nil, err
}

t, err := time.Parse(time.RFC3339, md.CreationTimestamp)
if err != nil {
return nil, err
}

resourcesDuration := make(map[corev1.ResourceName]wfv1.ResourceDuration)
if err := json.Unmarshal([]byte(md.ResourcesDuration), &resourcesDuration); err != nil {
return nil, err
}

wfs[i] = wfv1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: md.Name,
Namespace: md.Namespace,
UID: types.UID(md.UID),
CreationTimestamp: v1.Time{Time: md.StartedAt},
CreationTimestamp: v1.Time{Time: t},
Labels: labels,
Annotations: annotations,
},
Spec: wfv1.WorkflowSpec{
Suspend: md.Suspend,
},
Status: wfv1.WorkflowStatus{
Phase: md.Phase,
StartedAt: v1.Time{Time: md.StartedAt},
FinishedAt: v1.Time{Time: md.FinishedAt},
Progress: wfv1.Progress(md.Progress),
Phase: md.Phase,
StartedAt: v1.Time{Time: md.StartedAt},
FinishedAt: v1.Time{Time: md.FinishedAt},
Progress: wfv1.Progress(md.Progress),
Message: md.Message,
EstimatedDuration: wfv1.EstimatedDuration(md.EstimatedDuration),
ResourcesDuration: resourcesDuration,
},
}
}
Expand Down Expand Up @@ -356,9 +380,9 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
switch t {
case MySQL:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->>'$.metadata.labels', '{}') as labels,coalesce(workflow->>'$.metadata.annotations', '{}') as annotations, coalesce(workflow->>'$.status.progress', '') as progress"), nil
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->>'$.metadata.labels', '{}') as labels,coalesce(workflow->>'$.metadata.annotations', '{}') as annotations, coalesce(workflow->>'$.status.progress', '') as progress, coalesce(workflow->>'$.metadata.creationTimestamp', '') as creationtimestamp, workflow->>'$.spec.suspend' as suspend, coalesce(workflow->>'$.status.message', '') as message, coalesce(workflow->>'$.status.estimatedDuration', '0') as estimatedduration, coalesce(workflow->>'$.status.resourcesDuration', '{}') as resourcesduration"), nil
case Postgres:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce((workflow::json)->'metadata'->>'labels', '{}') as labels, coalesce((workflow::json)->'metadata'->>'annotations', '{}') as annotations, coalesce((workflow::json)->'status'->>'progress', '') as progress"), nil
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce((workflow::json)->'metadata'->>'labels', '{}') as labels, coalesce((workflow::json)->'metadata'->>'annotations', '{}') as annotations, coalesce((workflow::json)->'status'->>'progress', '') as progress, coalesce((workflow::json)->'metadata'->>'creationTimestamp', '') as creationtimestamp, (workflow::json)->'spec'->>'suspend' as suspend, coalesce((workflow::json)->'status'->>'message', '') as message, coalesce((workflow::json)->'status'->>'estimatedDuration', '0') as estimatedduration, coalesce((workflow::json)->'status'->>'resourcesDuration', '{}') as resourcesduration"), nil
}
return nil, fmt.Errorf("unsupported db type %s", t)
}

0 comments on commit b212d5f

Please sign in to comment.