Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Flyte Execution tags (#571)
Browse files Browse the repository at this point in the history
* test

Signed-off-by: Kevin Su <[email protected]>

* Execution tags

Signed-off-by: Kevin Su <[email protected]>

* Add tags filter

Signed-off-by: Kevin Su <[email protected]>

* Add execution tags table

Signed-off-by: Kevin Su <[email protected]>

* update tests

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* update

Signed-off-by: Kevin Su <[email protected]>

* update

Signed-off-by: Kevin Su <[email protected]>

* update migration

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* use gorm size

Signed-off-by: Kevin Su <[email protected]>

* Add tests

Signed-off-by: Kevin Su <[email protected]>

* bump idl

Signed-off-by: Kevin Su <[email protected]>

* bump idl

Signed-off-by: Kevin Su <[email protected]>

* bump idl

Signed-off-by: Kevin Su <[email protected]>

* address comment

Signed-off-by: Kevin Su <[email protected]>

* Set tag_name column as unique

Signed-off-by: Kevin Su <[email protected]>

* Add integration tests

Signed-off-by: Kevin Su <[email protected]>

* Update the tests

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* BeforeCreate

Signed-off-by: Kevin Su <[email protected]>

* Update migration ID

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* bump idl

Signed-off-by: Kevin Su <[email protected]>

* update migration id

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Aug 7, 2023
1 parent d34a940 commit 04bcb15
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.5.11
github.com/flyteorg/flyteidl v1.5.14
github.com/flyteorg/flyteplugins v1.0.67
github.com/flyteorg/flytepropeller v1.1.98
github.com/flyteorg/flytestdlib v1.0.22
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0=
github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI=
github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE=
github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA=
github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA=
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
NamedEntityMetadata = "nem"
Project = "p"
Signal = "s"
AdminTag = "at"
ExecutionAdminTag = "eat"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
workflowExecutionID, err)
return nil, nil, err
}

return ctx, executionModel, nil
}

Expand Down Expand Up @@ -1478,6 +1479,7 @@ func (m *ExecutionManager) ListExecutions(
execution.Spec.Inputs = nil
execution.Closure.ComputedInputs = nil
}

// END TO BE DELETED
var token string
if len(executionList) == int(request.Limit) {
Expand Down
1 change: 1 addition & 0 deletions pkg/manager/impl/testutils/mock_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func GetExecutionRequest() admin.ExecutionCreateRequest {
},
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: "default_raw_output"},
Envs: &admin.Envs{},
Tags: []string{"tag1", "tag2"},
},
Inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/impl/util/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var filterFieldEntityPrefix = map[string]common.Entity{
"named_entity_metadata": common.NamedEntityMetadata,
"project": common.Project,
"signal": common.Signal,
"admin_tag": common.AdminTag,
"execution_admin_tag": common.ExecutionAdminTag,
}

func parseField(field string, primaryEntity common.Entity) (common.Entity, string) {
Expand Down
83 changes: 83 additions & 0 deletions pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,89 @@ var NoopMigrations = []*gormigrate.Migration{
return nil
},
},

{
ID: "2023-08-04-admin-tags",
Migrate: func(tx *gorm.DB) error {
type AdminTag struct {
gorm.Model
Name string `gorm:"index:,unique;size:255"`
}

return tx.AutoMigrate(&AdminTag{})
},
Rollback: func(tx *gorm.DB) error {
return nil
},
},

{
ID: "2023-08-04-execution-admin-tags", // A join table used to associate executions with tags
Migrate: func(tx *gorm.DB) error {
type AdminTag struct {
gorm.Model
Name string `gorm:"index:,unique;size:255"`
}

type ExecutionKey struct {
Project string `gorm:"primary_key;column:execution_project" valid:"length(0|255)"`
Domain string `gorm:"primary_key;column:execution_domain" valid:"length(0|255)"`
Name string `gorm:"primary_key;column:execution_name" valid:"length(0|255)"`
}

type Execution struct {
ID uint `gorm:"index;autoIncrement;not null"`
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time `gorm:"index"`
ExecutionKey
LaunchPlanID uint `gorm:"index"`
WorkflowID uint `gorm:"index"`
TaskID uint `gorm:"index"`
Phase string `valid:"length(0|255)"`
Closure []byte
Spec []byte `gorm:"not null"`
StartedAt *time.Time
// Corresponds to the CreatedAt field in the Execution closure.
// Prefixed with Execution to avoid clashes with gorm.Model CreatedAt
ExecutionCreatedAt *time.Time `gorm:"index:idx_executions_created_at"`
// Corresponds to the UpdatedAt field in the Execution closure
// Prefixed with Execution to avoid clashes with gorm.Model UpdatedAt
ExecutionUpdatedAt *time.Time
Duration time.Duration
// In the case of an aborted execution this string may be non-empty.
// It should be ignored for any other value of phase other than aborted.
AbortCause string `valid:"length(0|255)"`
// Corresponds to the execution mode used to trigger this execution
Mode int32
// The "parent" execution (if there is one) that is related to this execution.
SourceExecutionID uint
// The parent node execution if this was launched by a node
ParentNodeExecutionID uint
// Cluster where execution was triggered
Cluster string `valid:"length(0|255)"`
// Offloaded location of inputs LiteralMap. These are the inputs evaluated and contain applied defaults.
InputsURI storage.DataReference
// User specified inputs. This map might be incomplete and not include defaults applied
UserInputsURI storage.DataReference
// Execution Error Kind. nullable
ErrorKind *string `gorm:"index"`
// Execution Error Code nullable
ErrorCode *string `valid:"length(0|255)"`
// The user responsible for launching this execution.
// This is also stored in the spec but promoted as a column for filtering.
User string `gorm:"index" valid:"length(0|255)"`
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"index;default:0"`
// The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task'
LaunchEntity string
// Tags associated with the execution
Tags []AdminTag `gorm:"many2many:execution_admin_tags;"`
}

return tx.AutoMigrate(&Execution{})
},
},
}

var Migrations = append(LegacyMigrations, NoopMigrations...)
Expand Down
4 changes: 4 additions & 0 deletions pkg/repositories/gormimpl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const taskExecutionTableName = "task_executions"
const taskTableName = "tasks"
const workflowTableName = "workflows"
const descriptionEntityTableName = "description_entities"
const AdminTagsTableName = "admin_tags"
const executionAdminTagsTableName = "execution_admin_tags"

const limit = "limit"
const filters = "filters"
Expand All @@ -45,6 +47,8 @@ var entityToTableName = map[common.Entity]string{
common.NamedEntity: "entities",
common.NamedEntityMetadata: "named_entity_metadata",
common.Signal: "signals",
common.AdminTag: "admin_tags",
common.ExecutionAdminTag: "execution_admin_tags",
}

var innerJoinExecToNodeExec = fmt.Sprintf(
Expand Down
11 changes: 9 additions & 2 deletions pkg/repositories/gormimpl/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) erro
return nil
}

func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
func (r *ExecutionRepo) Get(_ context.Context, input interfaces.Identifier) (models.Execution, error) {
var execution models.Execution
timer := r.metrics.GetDuration.Start()
tx := r.db.Where(&models.Execution{
Expand Down Expand Up @@ -66,7 +66,7 @@ func (r *ExecutionRepo) Update(ctx context.Context, execution models.Execution)
return nil
}

func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) (
func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInput) (
interfaces.ExecutionCollectionOutput, error) {
var err error
// First validate input.
Expand All @@ -89,6 +89,13 @@ func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceI
taskTableName, executionTableName, taskTableName))
}

if ok := input.JoinTableEntities[common.AdminTag]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name",
executionAdminTagsTableName, executionTableName, executionAdminTagsTableName))
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.id = %s.admin_tag_id",
AdminTagsTableName, AdminTagsTableName, executionAdminTagsTableName))
}

// Apply filters
tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters)
if err != nil {
Expand Down
40 changes: 37 additions & 3 deletions pkg/repositories/gormimpl/execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,36 @@ func TestListExecutions_Order(t *testing.T) {
assert.True(t, mockQuery.Triggered)
}

func TestListExecutions_WithTags(t *testing.T) {
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())

executions := make([]map[string]interface{}, 0)
GlobalMock := mocket.Catcher.Reset()
// Only match on queries that include ordering by name
mockQuery := GlobalMock.NewMock().WithQuery(`name asc`)
mockQuery.WithReply(executions)

sortParameter, _ := common.NewSortParameter(admin.Sort{
Direction: admin.Sort_ASCENDING,
Key: "name",
})
vals := []string{"tag1", "tag2"}
tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "admin_tag_name", vals)
assert.NoError(t, err)
_, err = executionRepo.List(context.Background(), interfaces.ListResourceInput{
SortParameter: sortParameter,
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.Task, "project", project),
getEqualityFilter(common.Task, "domain", domain),
getEqualityFilter(common.Task, "name", name),
tagFilter,
},
Limit: 20,
})
assert.NoError(t, err)
assert.True(t, mockQuery.Triggered)
}

func TestListExecutions_MissingParameters(t *testing.T) {
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
_, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
Expand Down Expand Up @@ -306,29 +336,33 @@ func TestListExecutionsForWorkflow(t *testing.T) {
StartedAt: &executionStartedAt,
Duration: time.Hour,
LaunchEntity: "launch_plan",
Tags: []models.AdminTag{{Name: "tag1"}, {Name: "tag2"}},
})
executions = append(executions, execution)

GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true

// Only match on queries that append expected filters
GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 LIMIT 20`).WithReply(executions)

GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 AND execution_admin_tags.execution_tag_name in ($6,$7) LIMIT 20`).WithReply(executions)
vals := []string{"tag1", "tag2"}
tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "execution_tag_name", vals)
assert.NoError(t, err)
collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.Execution, "project", project),
getEqualityFilter(common.Execution, "domain", domain),
getEqualityFilter(common.Execution, "name", "1"),
getEqualityFilter(common.Workflow, "name", "workflow_name"),
getEqualityFilter(common.Task, "name", "task_name"),
tagFilter,
},
Limit: 20,
JoinTableEntities: map[common.Entity]bool{
common.Workflow: true,
common.Task: true,
},
})

assert.NoError(t, err)
assert.NotEmpty(t, collection)
assert.NotEmpty(t, collection.Executions)
Expand Down
19 changes: 19 additions & 0 deletions pkg/repositories/models/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package models
import (
"time"

"gorm.io/gorm/clause"

"gorm.io/gorm"

"github.com/flyteorg/flytestdlib/storage"
)

Expand Down Expand Up @@ -60,4 +64,19 @@ type Execution struct {
State *int32 `gorm:"index;default:0"`
// The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task'
LaunchEntity string
// Tags associated with the execution
Tags []AdminTag `gorm:"many2many:execution_admin_tags;"`
}

type AdminTag struct {
gorm.Model
Name string `gorm:"index:,unique;size:255"`
}

func (b *AdminTag) BeforeCreate(tx *gorm.DB) (err error) {
tx.Statement.AddClause(clause.OnConflict{
Columns: []clause.Column{{Name: "name"}}, // key column
DoUpdates: clause.AssignmentColumns([]string{"name"}), // column needed to be updated
})
return nil
}
6 changes: 6 additions & 0 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
}

activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE)
tags := make([]models.AdminTag, len(input.RequestSpec.Tags))
for i, tag := range input.RequestSpec.Tags {
tags[i] = models.AdminTag{Name: tag}
}

executionModel := &models.Execution{
ExecutionKey: models.ExecutionKey{
Project: input.WorkflowExecutionID.Project,
Expand All @@ -119,6 +124,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
User: requestSpec.Metadata.Principal,
State: &activeExecution,
LaunchEntity: strings.ToLower(input.LaunchEntity.String()),
Tags: tags,
}
// A reference launch entity can be one of either or a task OR launch plan. Traditionally, workflows are executed
// with a reference launch plan which is why this behavior is the default below.
Expand Down
4 changes: 4 additions & 0 deletions tests/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func truncateAllTablesForTestingOnly() {
TruncateResources := fmt.Sprintf("TRUNCATE TABLE resources;")
TruncateSchedulableEntities := fmt.Sprintf("TRUNCATE TABLE schedulable_entities;")
TruncateSchedulableEntitiesSnapshots := fmt.Sprintf("TRUNCATE TABLE schedule_entities_snapshots;")
TruncateAdminTags := fmt.Sprintf("TRUNCATE TABLE admin_tags;")
TruncateExecutionAdminTags := fmt.Sprintf("TRUNCATE TABLE execution_admin_tags;")
ctx := context.Background()
db, err := repositories.GetDB(ctx, getDbConfig(), getLoggerConfig())
if err != nil {
Expand Down Expand Up @@ -100,6 +102,8 @@ func truncateAllTablesForTestingOnly() {
db.Exec(TruncateResources)
db.Exec(TruncateSchedulableEntities)
db.Exec(TruncateSchedulableEntitiesSnapshots)
db.Exec(TruncateAdminTags)
db.Exec(TruncateExecutionAdminTags)
}

func populateWorkflowExecutionForTestingOnly(project, domain, name string) {
Expand Down
28 changes: 28 additions & 0 deletions tests/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ func populateWorkflowExecutionsForTestingOnly() {
db.Exec(`INSERT INTO workflows ("id", "project", "domain", "name", "version", "remote_closure_identifier") ` +
`VALUES (4, 'project2', 'domain2', 'name2', 'version1', 's3://foo')`)

// Insert dummy tags
db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (1, 'hello')`)
db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (2, 'flyte')`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 1)`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 2)`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name3', 2)`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name4', 1)`)

for _, statement := range insertExecutionStatements {
db.Exec(statement)
}
Expand All @@ -209,6 +217,26 @@ func TestListWorkflowExecutions(t *testing.T) {
assert.Equal(t, len(resp.Executions), 4)
}

func TestListWorkflowExecutionsWithTags(t *testing.T) {
truncateAllTablesForTestingOnly()
populateWorkflowExecutionsForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

resp, err := client.ListExecutions(ctx, &admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
Project: "project1",
Domain: "domain1",
},
Limit: 5,
Filters: "value_in(admin_tag.name, hello)",
})
assert.Nil(t, err)
assert.Equal(t, len(resp.Executions), 2)
}

func TestListWorkflowExecutions_Filters(t *testing.T) {
truncateAllTablesForTestingOnly()
populateWorkflowExecutionsForTestingOnly()
Expand Down

0 comments on commit 04bcb15

Please sign in to comment.