From 65ece76918d790476584656ee4f41ef798268dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Fri, 20 Sep 2024 13:06:45 +0200 Subject: [PATCH] changes after review --- .../assert/objectassert/task_snowflake_ext.go | 18 +++--- pkg/sdk/tasks_def.go | 15 +++-- pkg/sdk/tasks_dto_builders_gen.go | 12 ++-- pkg/sdk/tasks_dto_gen.go | 34 +++++----- pkg/sdk/tasks_gen.go | 42 ++++++------ pkg/sdk/tasks_gen_test.go | 8 +-- pkg/sdk/tasks_impl_gen.go | 39 ++++++----- pkg/sdk/tasks_validations_gen.go | 15 ++++- pkg/sdk/testint/tasks_gen_integration_test.go | 64 ++++++++++++++++--- 9 files changed, 156 insertions(+), 91 deletions(-) diff --git a/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go b/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go index 8762de1c4d..de8b6d1974 100644 --- a/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go +++ b/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go @@ -10,30 +10,30 @@ import ( "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" ) -func (w *TaskAssert) HasNotEmptyCreatedOn() *TaskAssert { - w.AddAssertion(func(t *testing.T, o *sdk.Task) error { +func (t *TaskAssert) HasNotEmptyCreatedOn() *TaskAssert { + t.AddAssertion(func(t *testing.T, o *sdk.Task) error { t.Helper() if o.CreatedOn == "" { return fmt.Errorf("expected created on not empty; got: %v", o.CreatedOn) } return nil }) - return w + return t } -func (w *TaskAssert) HasNotEmptyId() *TaskAssert { - w.AddAssertion(func(t *testing.T, o *sdk.Task) error { +func (t *TaskAssert) HasNotEmptyId() *TaskAssert { + t.AddAssertion(func(t *testing.T, o *sdk.Task) error { t.Helper() if o.Id == "" { return fmt.Errorf("expected id not empty; got: %v", o.CreatedOn) } return nil }) - return w + return t } -func (w *TaskAssert) HasPredecessors(ids ...sdk.SchemaObjectIdentifier) *TaskAssert { - w.AddAssertion(func(t *testing.T, o *sdk.Task) error { +func (t *TaskAssert) HasPredecessors(ids ...sdk.SchemaObjectIdentifier) *TaskAssert { + t.AddAssertion(func(t *testing.T, o *sdk.Task) error { t.Helper() if len(o.Predecessors) != len(ids) { return fmt.Errorf("expected %d (%v) predecessors, got %d (%v)", len(ids), ids, len(o.Predecessors), o.Predecessors) @@ -48,7 +48,7 @@ func (w *TaskAssert) HasPredecessors(ids ...sdk.SchemaObjectIdentifier) *TaskAss } return errors.Join(errs...) }) - return w + return t } func (t *TaskAssert) HasTaskRelations(expected sdk.TaskRelations) *TaskAssert { diff --git a/pkg/sdk/tasks_def.go b/pkg/sdk/tasks_def.go index b2bc05f399..26214a521d 100644 --- a/pkg/sdk/tasks_def.go +++ b/pkg/sdk/tasks_def.go @@ -112,7 +112,7 @@ var task = g.PlainStruct("Task"). Text("Definition"). OptionalText("Condition"). Bool("AllowOverlappingExecution"). - OptionalText("ErrorIntegration"). + Field("ErrorIntegration", g.KindOfTSlice[AccountObjectIdentifier]()). OptionalText("LastCommittedOn"). OptionalText("LastSuspendedOn"). Text("OwnerRoleType"). @@ -146,7 +146,7 @@ var TasksDef = g.NewInterface( OptionalSessionParameters(). OptionalNumberAssignment("USER_TASK_TIMEOUT_MS", nil). OptionalNumberAssignment("SUSPEND_TASK_AFTER_NUM_FAILURES", nil). - OptionalTextAssignment("ERROR_INTEGRATION", g.ParameterOptions().NoQuotes()). + OptionalIdentifier("ErrorNotificationIntegration", g.KindOfT[AccountObjectIdentifier](), g.IdentifierOptions().Equals().SQL("ERROR_INTEGRATION")). OptionalTextAssignment("COMMENT", g.ParameterOptions().SingleQuotes()). OptionalIdentifier("Finalize", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Equals().SQL("FINALIZE")). OptionalNumberAssignment("TASK_AUTO_RETRY_ATTEMPTS", g.ParameterOptions()). @@ -157,6 +157,7 @@ var TasksDef = g.NewInterface( SQL("AS"). Text("sql", g.KeywordOptions().NoQuotes().Required()). WithValidation(g.ValidIdentifier, "name"). + WithValidation(g.ValidIdentifierIfSet, "ErrorNotificationIntegration"). WithValidation(g.ConflictingFields, "OrReplace", "IfNotExists"), taskCreateWarehouse, ). @@ -174,7 +175,7 @@ var TasksDef = g.NewInterface( OptionalNumberAssignment("USER_TASK_TIMEOUT_MS", nil). OptionalSessionParameters(). OptionalNumberAssignment("SUSPEND_TASK_AFTER_NUM_FAILURES", nil). - OptionalTextAssignment("ERROR_INTEGRATION", g.ParameterOptions().NoQuotes()). + OptionalIdentifier("ErrorNotificationIntegration", g.KindOfT[AccountObjectIdentifier](), g.IdentifierOptions().Equals().SQL("ERROR_INTEGRATION")). OptionalTextAssignment("COMMENT", g.ParameterOptions().SingleQuotes()). OptionalIdentifier("Finalize", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Equals().SQL("FINALIZE")). OptionalNumberAssignment("TASK_AUTO_RETRY_ATTEMPTS", g.ParameterOptions()). @@ -182,7 +183,8 @@ var TasksDef = g.NewInterface( OptionalTextAssignment("WHEN", g.ParameterOptions().NoQuotes().NoEquals()). SQL("AS"). Text("sql", g.KeywordOptions().NoQuotes().Required()). - WithValidation(g.ValidIdentifier, "name"), + WithValidation(g.ValidIdentifier, "name"). + WithValidation(g.ValidIdentifierIfSet, "ErrorNotificationIntegration"), ). CustomOperation( "Clone", @@ -219,13 +221,14 @@ var TasksDef = g.NewInterface( OptionalBooleanAssignment("ALLOW_OVERLAPPING_EXECUTION", nil). OptionalNumberAssignment("USER_TASK_TIMEOUT_MS", nil). OptionalNumberAssignment("SUSPEND_TASK_AFTER_NUM_FAILURES", nil). - OptionalTextAssignment("ERROR_INTEGRATION", g.ParameterOptions().NoQuotes()). + OptionalIdentifier("ErrorNotificationIntegration", g.KindOfT[AccountObjectIdentifier](), g.IdentifierOptions().Equals().SQL("ERROR_INTEGRATION")). OptionalTextAssignment("COMMENT", g.ParameterOptions().SingleQuotes()). OptionalSessionParameters(). OptionalNumberAssignment("TASK_AUTO_RETRY_ATTEMPTS", nil). OptionalNumberAssignment("USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS", nil). WithValidation(g.AtLeastOneValueSet, "Warehouse", "UserTaskManagedInitialWarehouseSize", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParameters", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds"). - WithValidation(g.ConflictingFields, "Warehouse", "UserTaskManagedInitialWarehouseSize"), + WithValidation(g.ConflictingFields, "Warehouse", "UserTaskManagedInitialWarehouseSize"). + WithValidation(g.ValidIdentifierIfSet, "ErrorNotificationIntegration"), g.ListOptions().SQL("SET"), ). OptionalQueryStructField( diff --git a/pkg/sdk/tasks_dto_builders_gen.go b/pkg/sdk/tasks_dto_builders_gen.go index 24ec66629f..7a0397bca2 100644 --- a/pkg/sdk/tasks_dto_builders_gen.go +++ b/pkg/sdk/tasks_dto_builders_gen.go @@ -59,8 +59,8 @@ func (s *CreateTaskRequest) WithSuspendTaskAfterNumFailures(SuspendTaskAfterNumF return s } -func (s *CreateTaskRequest) WithErrorIntegration(ErrorIntegration string) *CreateTaskRequest { - s.ErrorIntegration = &ErrorIntegration +func (s *CreateTaskRequest) WithErrorNotificationIntegration(ErrorNotificationIntegration AccountObjectIdentifier) *CreateTaskRequest { + s.ErrorNotificationIntegration = &ErrorNotificationIntegration return s } @@ -158,8 +158,8 @@ func (s *CreateOrAlterTaskRequest) WithSuspendTaskAfterNumFailures(SuspendTaskAf return s } -func (s *CreateOrAlterTaskRequest) WithErrorIntegration(ErrorIntegration string) *CreateOrAlterTaskRequest { - s.ErrorIntegration = &ErrorIntegration +func (s *CreateOrAlterTaskRequest) WithErrorNotificationIntegration(ErrorNotificationIntegration AccountObjectIdentifier) *CreateOrAlterTaskRequest { + s.ErrorNotificationIntegration = &ErrorNotificationIntegration return s } @@ -325,8 +325,8 @@ func (s *TaskSetRequest) WithSuspendTaskAfterNumFailures(SuspendTaskAfterNumFail return s } -func (s *TaskSetRequest) WithErrorIntegration(ErrorIntegration string) *TaskSetRequest { - s.ErrorIntegration = &ErrorIntegration +func (s *TaskSetRequest) WithErrorNotificationIntegration(ErrorNotificationIntegration AccountObjectIdentifier) *TaskSetRequest { + s.ErrorNotificationIntegration = &ErrorNotificationIntegration return s } diff --git a/pkg/sdk/tasks_dto_gen.go b/pkg/sdk/tasks_dto_gen.go index 9a909cbbd6..e6a2726b4e 100644 --- a/pkg/sdk/tasks_dto_gen.go +++ b/pkg/sdk/tasks_dto_gen.go @@ -24,7 +24,7 @@ type CreateTaskRequest struct { SessionParameters *SessionParameters UserTaskTimeoutMs *int SuspendTaskAfterNumFailures *int - ErrorIntegration *string + ErrorNotificationIntegration *AccountObjectIdentifier Comment *string Finalize *SchemaObjectIdentifier TaskAutoRetryAttempts *int @@ -45,21 +45,21 @@ func (r *CreateTaskRequest) GetName() SchemaObjectIdentifier { } type CreateOrAlterTaskRequest struct { - name SchemaObjectIdentifier // required - Warehouse *CreateTaskWarehouseRequest - Schedule *string - Config *string - AllowOverlappingExecution *bool - UserTaskTimeoutMs *int - SessionParameters *SessionParameters - SuspendTaskAfterNumFailures *int - ErrorIntegration *string - Comment *string - Finalize *SchemaObjectIdentifier - TaskAutoRetryAttempts *int - After []SchemaObjectIdentifier - When *string - sql string // required + name SchemaObjectIdentifier // required + Warehouse *CreateTaskWarehouseRequest + Schedule *string + Config *string + AllowOverlappingExecution *bool + UserTaskTimeoutMs *int + SessionParameters *SessionParameters + SuspendTaskAfterNumFailures *int + ErrorNotificationIntegration *AccountObjectIdentifier + Comment *string + Finalize *SchemaObjectIdentifier + TaskAutoRetryAttempts *int + After []SchemaObjectIdentifier + When *string + sql string // required } func (r *CreateOrAlterTaskRequest) GetName() SchemaObjectIdentifier { @@ -103,7 +103,7 @@ type TaskSetRequest struct { AllowOverlappingExecution *bool UserTaskTimeoutMs *int SuspendTaskAfterNumFailures *int - ErrorIntegration *string + ErrorNotificationIntegration *AccountObjectIdentifier Comment *string SessionParameters *SessionParameters TaskAutoRetryAttempts *int diff --git a/pkg/sdk/tasks_gen.go b/pkg/sdk/tasks_gen.go index ae90dc7d75..56123086ff 100644 --- a/pkg/sdk/tasks_gen.go +++ b/pkg/sdk/tasks_gen.go @@ -33,7 +33,7 @@ type CreateTaskOptions struct { SessionParameters *SessionParameters `ddl:"list,no_parentheses"` UserTaskTimeoutMs *int `ddl:"parameter" sql:"USER_TASK_TIMEOUT_MS"` SuspendTaskAfterNumFailures *int `ddl:"parameter" sql:"SUSPEND_TASK_AFTER_NUM_FAILURES"` - ErrorIntegration *string `ddl:"parameter,no_quotes" sql:"ERROR_INTEGRATION"` + ErrorNotificationIntegration *AccountObjectIdentifier `ddl:"identifier,equals" sql:"ERROR_INTEGRATION"` Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"` Finalize *SchemaObjectIdentifier `ddl:"identifier,equals" sql:"FINALIZE"` TaskAutoRetryAttempts *int `ddl:"parameter" sql:"TASK_AUTO_RETRY_ATTEMPTS"` @@ -52,24 +52,24 @@ type CreateTaskWarehouse struct { // CreateOrAlterTaskOptions is based on https://docs.snowflake.com/en/sql-reference/sql/create-task#create-or-alter-task. type CreateOrAlterTaskOptions struct { - createOrAlter bool `ddl:"static" sql:"CREATE OR ALTER"` - task bool `ddl:"static" sql:"TASK"` - name SchemaObjectIdentifier `ddl:"identifier"` - Warehouse *CreateTaskWarehouse `ddl:"keyword"` - Schedule *string `ddl:"parameter,single_quotes" sql:"SCHEDULE"` - Config *string `ddl:"parameter,no_quotes" sql:"CONFIG"` - AllowOverlappingExecution *bool `ddl:"parameter" sql:"ALLOW_OVERLAPPING_EXECUTION"` - UserTaskTimeoutMs *int `ddl:"parameter" sql:"USER_TASK_TIMEOUT_MS"` - SessionParameters *SessionParameters `ddl:"list,no_parentheses"` - SuspendTaskAfterNumFailures *int `ddl:"parameter" sql:"SUSPEND_TASK_AFTER_NUM_FAILURES"` - ErrorIntegration *string `ddl:"parameter,no_quotes" sql:"ERROR_INTEGRATION"` - Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"` - Finalize *SchemaObjectIdentifier `ddl:"identifier,equals" sql:"FINALIZE"` - TaskAutoRetryAttempts *int `ddl:"parameter" sql:"TASK_AUTO_RETRY_ATTEMPTS"` - After []SchemaObjectIdentifier `ddl:"parameter,no_equals" sql:"AFTER"` - When *string `ddl:"parameter,no_quotes,no_equals" sql:"WHEN"` - as bool `ddl:"static" sql:"AS"` - sql string `ddl:"keyword,no_quotes"` + createOrAlter bool `ddl:"static" sql:"CREATE OR ALTER"` + task bool `ddl:"static" sql:"TASK"` + name SchemaObjectIdentifier `ddl:"identifier"` + Warehouse *CreateTaskWarehouse `ddl:"keyword"` + Schedule *string `ddl:"parameter,single_quotes" sql:"SCHEDULE"` + Config *string `ddl:"parameter,no_quotes" sql:"CONFIG"` + AllowOverlappingExecution *bool `ddl:"parameter" sql:"ALLOW_OVERLAPPING_EXECUTION"` + UserTaskTimeoutMs *int `ddl:"parameter" sql:"USER_TASK_TIMEOUT_MS"` + SessionParameters *SessionParameters `ddl:"list,no_parentheses"` + SuspendTaskAfterNumFailures *int `ddl:"parameter" sql:"SUSPEND_TASK_AFTER_NUM_FAILURES"` + ErrorNotificationIntegration *AccountObjectIdentifier `ddl:"identifier,equals" sql:"ERROR_INTEGRATION"` + Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"` + Finalize *SchemaObjectIdentifier `ddl:"identifier,equals" sql:"FINALIZE"` + TaskAutoRetryAttempts *int `ddl:"parameter" sql:"TASK_AUTO_RETRY_ATTEMPTS"` + After []SchemaObjectIdentifier `ddl:"parameter,no_equals" sql:"AFTER"` + When *string `ddl:"parameter,no_quotes,no_equals" sql:"WHEN"` + as bool `ddl:"static" sql:"AS"` + sql string `ddl:"keyword,no_quotes"` } // CloneTaskOptions is based on https://docs.snowflake.com/en/sql-reference/sql/create-task#create-task-clone. @@ -112,7 +112,7 @@ type TaskSet struct { AllowOverlappingExecution *bool `ddl:"parameter" sql:"ALLOW_OVERLAPPING_EXECUTION"` UserTaskTimeoutMs *int `ddl:"parameter" sql:"USER_TASK_TIMEOUT_MS"` SuspendTaskAfterNumFailures *int `ddl:"parameter" sql:"SUSPEND_TASK_AFTER_NUM_FAILURES"` - ErrorIntegration *string `ddl:"parameter,no_quotes" sql:"ERROR_INTEGRATION"` + ErrorNotificationIntegration *AccountObjectIdentifier `ddl:"identifier,equals" sql:"ERROR_INTEGRATION"` Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"` SessionParameters *SessionParameters `ddl:"list,no_parentheses"` TaskAutoRetryAttempts *int `ddl:"parameter" sql:"TASK_AUTO_RETRY_ATTEMPTS"` @@ -193,7 +193,7 @@ type Task struct { Definition string Condition string AllowOverlappingExecution bool - ErrorIntegration string + ErrorIntegration *AccountObjectIdentifier LastCommittedOn string LastSuspendedOn string OwnerRoleType string diff --git a/pkg/sdk/tasks_gen_test.go b/pkg/sdk/tasks_gen_test.go index f7793fb67e..53aa7d8ea4 100644 --- a/pkg/sdk/tasks_gen_test.go +++ b/pkg/sdk/tasks_gen_test.go @@ -81,7 +81,7 @@ func TestTasks_Create(t *testing.T) { } opts.UserTaskTimeoutMs = Int(5) opts.SuspendTaskAfterNumFailures = Int(6) - opts.ErrorIntegration = String("some_error_integration") + opts.ErrorNotificationIntegration = Pointer(NewAccountObjectIdentifier("some_error_integration")) opts.Comment = String("some comment") opts.Finalize = &finalizerId opts.TaskAutoRetryAttempts = Int(10) @@ -93,7 +93,7 @@ func TestTasks_Create(t *testing.T) { opts.After = []SchemaObjectIdentifier{otherTaskId} opts.When = String(`SYSTEM$STREAM_HAS_DATA('MYSTREAM')`) - assertOptsValidAndSQLEquals(t, opts, "CREATE OR REPLACE TASK %s WAREHOUSE = %s SCHEDULE = '10 MINUTE' CONFIG = $${\"output_dir\": \"/temp/test_directory/\", \"learning_rate\": 0.1}$$ ALLOW_OVERLAPPING_EXECUTION = true JSON_INDENT = 10, LOCK_TIMEOUT = 5 USER_TASK_TIMEOUT_MS = 5 SUSPEND_TASK_AFTER_NUM_FAILURES = 6 ERROR_INTEGRATION = some_error_integration COMMENT = 'some comment' FINALIZE = %s TASK_AUTO_RETRY_ATTEMPTS = 10 TAG (%s = 'v1') USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS = 10 AFTER %s WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT CURRENT_TIMESTAMP", id.FullyQualifiedName(), warehouseId.FullyQualifiedName(), finalizerId.FullyQualifiedName(), tagId.FullyQualifiedName(), otherTaskId.FullyQualifiedName()) + assertOptsValidAndSQLEquals(t, opts, "CREATE OR REPLACE TASK %s WAREHOUSE = %s SCHEDULE = '10 MINUTE' CONFIG = $${\"output_dir\": \"/temp/test_directory/\", \"learning_rate\": 0.1}$$ ALLOW_OVERLAPPING_EXECUTION = true JSON_INDENT = 10, LOCK_TIMEOUT = 5 USER_TASK_TIMEOUT_MS = 5 SUSPEND_TASK_AFTER_NUM_FAILURES = 6 ERROR_INTEGRATION = \"some_error_integration\" COMMENT = 'some comment' FINALIZE = %s TASK_AUTO_RETRY_ATTEMPTS = 10 TAG (%s = 'v1') USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS = 10 AFTER %s WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT CURRENT_TIMESTAMP", id.FullyQualifiedName(), warehouseId.FullyQualifiedName(), finalizerId.FullyQualifiedName(), tagId.FullyQualifiedName(), otherTaskId.FullyQualifiedName()) }) } @@ -157,14 +157,14 @@ func TestTasks_CreateOrAlter(t *testing.T) { LockTimeout: Int(5), } opts.SuspendTaskAfterNumFailures = Int(6) - opts.ErrorIntegration = String("some_error_integration") + opts.ErrorNotificationIntegration = Pointer(NewAccountObjectIdentifier("some_error_integration")) opts.Comment = String("some comment") opts.Finalize = &finalizerId opts.TaskAutoRetryAttempts = Int(10) opts.After = []SchemaObjectIdentifier{otherTaskId} opts.When = String(`SYSTEM$STREAM_HAS_DATA('MYSTREAM')`) - assertOptsValidAndSQLEquals(t, opts, "CREATE OR ALTER TASK %s WAREHOUSE = %s SCHEDULE = '10 MINUTE' CONFIG = $${\"output_dir\": \"/temp/test_directory/\", \"learning_rate\": 0.1}$$ ALLOW_OVERLAPPING_EXECUTION = true USER_TASK_TIMEOUT_MS = 5 JSON_INDENT = 10, LOCK_TIMEOUT = 5 SUSPEND_TASK_AFTER_NUM_FAILURES = 6 ERROR_INTEGRATION = some_error_integration COMMENT = 'some comment' FINALIZE = %s TASK_AUTO_RETRY_ATTEMPTS = 10 AFTER %s WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT CURRENT_TIMESTAMP", id.FullyQualifiedName(), warehouseId.FullyQualifiedName(), finalizerId.FullyQualifiedName(), otherTaskId.FullyQualifiedName()) + assertOptsValidAndSQLEquals(t, opts, "CREATE OR ALTER TASK %s WAREHOUSE = %s SCHEDULE = '10 MINUTE' CONFIG = $${\"output_dir\": \"/temp/test_directory/\", \"learning_rate\": 0.1}$$ ALLOW_OVERLAPPING_EXECUTION = true USER_TASK_TIMEOUT_MS = 5 JSON_INDENT = 10, LOCK_TIMEOUT = 5 SUSPEND_TASK_AFTER_NUM_FAILURES = 6 ERROR_INTEGRATION = \"some_error_integration\" COMMENT = 'some comment' FINALIZE = %s TASK_AUTO_RETRY_ATTEMPTS = 10 AFTER %s WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT CURRENT_TIMESTAMP", id.FullyQualifiedName(), warehouseId.FullyQualifiedName(), finalizerId.FullyQualifiedName(), otherTaskId.FullyQualifiedName()) }) } diff --git a/pkg/sdk/tasks_impl_gen.go b/pkg/sdk/tasks_impl_gen.go index a47c5f3de3..9f00526c25 100644 --- a/pkg/sdk/tasks_impl_gen.go +++ b/pkg/sdk/tasks_impl_gen.go @@ -170,7 +170,7 @@ func (r *CreateTaskRequest) toOpts() *CreateTaskOptions { SessionParameters: r.SessionParameters, UserTaskTimeoutMs: r.UserTaskTimeoutMs, SuspendTaskAfterNumFailures: r.SuspendTaskAfterNumFailures, - ErrorIntegration: r.ErrorIntegration, + ErrorNotificationIntegration: r.ErrorNotificationIntegration, Comment: r.Comment, Finalize: r.Finalize, TaskAutoRetryAttempts: r.TaskAutoRetryAttempts, @@ -191,20 +191,20 @@ func (r *CreateTaskRequest) toOpts() *CreateTaskOptions { func (r *CreateOrAlterTaskRequest) toOpts() *CreateOrAlterTaskOptions { opts := &CreateOrAlterTaskOptions{ - name: r.name, - Schedule: r.Schedule, - Config: r.Config, - AllowOverlappingExecution: r.AllowOverlappingExecution, - UserTaskTimeoutMs: r.UserTaskTimeoutMs, - SessionParameters: r.SessionParameters, - SuspendTaskAfterNumFailures: r.SuspendTaskAfterNumFailures, - ErrorIntegration: r.ErrorIntegration, - Comment: r.Comment, - Finalize: r.Finalize, - TaskAutoRetryAttempts: r.TaskAutoRetryAttempts, - After: r.After, - When: r.When, - sql: r.sql, + name: r.name, + Schedule: r.Schedule, + Config: r.Config, + AllowOverlappingExecution: r.AllowOverlappingExecution, + UserTaskTimeoutMs: r.UserTaskTimeoutMs, + SessionParameters: r.SessionParameters, + SuspendTaskAfterNumFailures: r.SuspendTaskAfterNumFailures, + ErrorNotificationIntegration: r.ErrorNotificationIntegration, + Comment: r.Comment, + Finalize: r.Finalize, + TaskAutoRetryAttempts: r.TaskAutoRetryAttempts, + After: r.After, + When: r.When, + sql: r.sql, } if r.Warehouse != nil { opts.Warehouse = &CreateTaskWarehouse{ @@ -251,7 +251,7 @@ func (r *AlterTaskRequest) toOpts() *AlterTaskOptions { AllowOverlappingExecution: r.Set.AllowOverlappingExecution, UserTaskTimeoutMs: r.Set.UserTaskTimeoutMs, SuspendTaskAfterNumFailures: r.Set.SuspendTaskAfterNumFailures, - ErrorIntegration: r.Set.ErrorIntegration, + ErrorNotificationIntegration: r.Set.ErrorNotificationIntegration, Comment: r.Set.Comment, SessionParameters: r.Set.SessionParameters, TaskAutoRetryAttempts: r.Set.TaskAutoRetryAttempts, @@ -345,7 +345,12 @@ func (r taskDBRow) convert() *Task { task.Condition = r.Condition.String } if r.ErrorIntegration.Valid && r.ErrorIntegration.String != "null" { - task.ErrorIntegration = r.ErrorIntegration.String + id, err := ParseAccountObjectIdentifier(r.ErrorIntegration.String) + if err != nil { + log.Printf("[DEBUG] failed to parse error_integration: %v", err) + } else { + task.ErrorIntegration = &id + } } if r.LastCommittedOn.Valid { task.LastCommittedOn = r.LastCommittedOn.String diff --git a/pkg/sdk/tasks_validations_gen.go b/pkg/sdk/tasks_validations_gen.go index 6c3679a402..6a5392457b 100644 --- a/pkg/sdk/tasks_validations_gen.go +++ b/pkg/sdk/tasks_validations_gen.go @@ -22,7 +22,7 @@ func (opts *CreateTaskOptions) validate() error { } } if valueSet(opts.Warehouse) { - if !anyValueSet(opts.Warehouse.Warehouse, opts.Warehouse.UserTaskManagedInitialWarehouseSize) { + if !exactlyOneValueSet(opts.Warehouse.Warehouse, opts.Warehouse.UserTaskManagedInitialWarehouseSize) { errs = append(errs, errExactlyOneOf("CreateTaskOptions.Warehouse", "Warehouse", "UserTaskManagedInitialWarehouseSize")) } } @@ -32,6 +32,9 @@ func (opts *CreateTaskOptions) validate() error { if everyValueSet(opts.OrReplace, opts.IfNotExists) { errs = append(errs, errOneOf("CreateTaskOptions", "OrReplace", "IfNotExists")) } + if opts.ErrorNotificationIntegration != nil && !ValidObjectIdentifier(opts.ErrorNotificationIntegration) { + errs = append(errs, errInvalidIdentifier("CreateTaskOptions", "ErrorNotificationIntegration")) + } return JoinErrors(errs...) } @@ -46,13 +49,16 @@ func (opts *CreateOrAlterTaskOptions) validate() error { } } if valueSet(opts.Warehouse) { - if !anyValueSet(opts.Warehouse.Warehouse, opts.Warehouse.UserTaskManagedInitialWarehouseSize) { + if !exactlyOneValueSet(opts.Warehouse.Warehouse, opts.Warehouse.UserTaskManagedInitialWarehouseSize) { errs = append(errs, errExactlyOneOf("CreateOrAlterTaskOptions.CreateTaskWarehouse", "Warehouse", "UserTaskManagedInitialWarehouseSize")) } } if !ValidObjectIdentifier(opts.name) { errs = append(errs, ErrInvalidObjectIdentifier) } + if opts.ErrorNotificationIntegration != nil && !ValidObjectIdentifier(opts.ErrorNotificationIntegration) { + errs = append(errs, errInvalidIdentifier("CreateOrAlterTaskOptions", "ErrorNotificationIntegration")) + } return JoinErrors(errs...) } @@ -87,12 +93,15 @@ func (opts *AlterTaskOptions) validate() error { errs = append(errs, err) } } - if !anyValueSet(opts.Set.Warehouse, opts.Set.UserTaskManagedInitialWarehouseSize, opts.Set.Schedule, opts.Set.Config, opts.Set.AllowOverlappingExecution, opts.Set.UserTaskTimeoutMs, opts.Set.SuspendTaskAfterNumFailures, opts.Set.ErrorIntegration, opts.Set.Comment, opts.Set.SessionParameters, opts.Set.TaskAutoRetryAttempts, opts.Set.UserTaskMinimumTriggerIntervalInSeconds) { + if !anyValueSet(opts.Set.Warehouse, opts.Set.UserTaskManagedInitialWarehouseSize, opts.Set.Schedule, opts.Set.Config, opts.Set.AllowOverlappingExecution, opts.Set.UserTaskTimeoutMs, opts.Set.SuspendTaskAfterNumFailures, opts.Set.ErrorNotificationIntegration, opts.Set.Comment, opts.Set.SessionParameters, opts.Set.TaskAutoRetryAttempts, opts.Set.UserTaskMinimumTriggerIntervalInSeconds) { errs = append(errs, errAtLeastOneOf("AlterTaskOptions.Set", "Warehouse", "UserTaskManagedInitialWarehouseSize", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParameters", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds")) } if everyValueSet(opts.Set.Warehouse, opts.Set.UserTaskManagedInitialWarehouseSize) { errs = append(errs, errOneOf("AlterTaskOptions.Set", "Warehouse", "UserTaskManagedInitialWarehouseSize")) } + if opts.Set.ErrorNotificationIntegration != nil && !ValidObjectIdentifier(opts.Set.ErrorNotificationIntegration) { + errs = append(errs, errInvalidIdentifier("AlterTaskOptions.Set", "ErrorNotificationIntegration")) + } } if valueSet(opts.Unset) { if !anyValueSet(opts.Unset.Warehouse, opts.Unset.Schedule, opts.Unset.Config, opts.Unset.AllowOverlappingExecution, opts.Unset.UserTaskTimeoutMs, opts.Unset.SuspendTaskAfterNumFailures, opts.Unset.ErrorIntegration, opts.Unset.Comment, opts.Unset.SessionParametersUnset, opts.Unset.TaskAutoRetryAttempts, opts.Unset.UserTaskMinimumTriggerIntervalInSeconds) { diff --git a/pkg/sdk/testint/tasks_gen_integration_test.go b/pkg/sdk/testint/tasks_gen_integration_test.go index 68b6cb4fb2..71fca789e2 100644 --- a/pkg/sdk/testint/tasks_gen_integration_test.go +++ b/pkg/sdk/testint/tasks_gen_integration_test.go @@ -1,6 +1,7 @@ package testint import ( + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/helpers/random" "testing" assertions "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" @@ -16,6 +17,17 @@ func TestInt_Tasks(t *testing.T) { ctx := testContext(t) sql := "SELECT CURRENT_TIMESTAMP" + // TODO [SNOW-1017580]: replace with real value + const gcpPubsubSubscriptionName = "projects/project-1234/subscriptions/sub2" + errorIntegrationId := testClientHelper().Ids.RandomAccountObjectIdentifier() + err := client.NotificationIntegrations.Create(ctx, + sdk.NewCreateNotificationIntegrationRequest(errorIntegrationId, true). + WithAutomatedDataLoadsParams(sdk.NewAutomatedDataLoadsParamsRequest(). + WithGoogleAutoParams(sdk.NewGoogleAutoParamsRequest(gcpPubsubSubscriptionName)), + ), + ) + require.NoError(t, err) + assertTask := func(t *testing.T, task *sdk.Task, id sdk.SchemaObjectIdentifier, warehouseName string) { t.Helper() assertions.AssertThat(t, objectassert.TaskFromObject(t, task). @@ -44,7 +56,7 @@ func TestInt_Tasks(t *testing.T) { ) } - assertTaskWithOptions := func(t *testing.T, task *sdk.Task, id sdk.SchemaObjectIdentifier, comment string, warehouse string, schedule string, condition string, allowOverlappingExecution bool, config string, predecessor *sdk.SchemaObjectIdentifier) { + assertTaskWithOptions := func(t *testing.T, task *sdk.Task, id sdk.SchemaObjectIdentifier, comment string, warehouse string, schedule string, condition string, allowOverlappingExecution bool, config string, predecessor *sdk.SchemaObjectIdentifier, errorIntegrationName string) { t.Helper() asserts := objectassert.TaskFromObject(t, task). @@ -61,7 +73,7 @@ func TestInt_Tasks(t *testing.T) { HasDefinition(sql). HasCondition(condition). HasAllowOverlappingExecution(allowOverlappingExecution). - HasErrorIntegration(""). + HasErrorIntegration(errorIntegrationName). HasLastCommittedOn(""). HasLastSuspendedOn(""). HasOwnerRoleType("ROLE"). @@ -137,12 +149,13 @@ func TestInt_Tasks(t *testing.T) { assertTask(t, task, id, "") }) - t.Run("create task: almost complete case", func(t *testing.T) { + t.Run("create task: complete case", func(t *testing.T) { id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - err := testClient(t).Tasks.Create(ctx, sdk.NewCreateTaskRequest(id, sql). + err = testClient(t).Tasks.Create(ctx, sdk.NewCreateTaskRequest(id, sql). WithOrReplace(true). WithWarehouse(*sdk.NewCreateTaskWarehouseRequest().WithWarehouse(testClientHelper().Ids.WarehouseId())). + WithErrorNotificationIntegration(errorIntegrationId). WithSchedule("10 MINUTE"). WithConfig(`$${"output_dir": "/temp/test_directory/", "learning_rate": 0.1}$$`). WithAllowOverlappingExecution(true). @@ -159,7 +172,7 @@ func TestInt_Tasks(t *testing.T) { task, err := testClientHelper().Task.Show(t, id) require.NoError(t, err) - assertTaskWithOptions(t, task, id, "some comment", testClientHelper().Ids.WarehouseId().Name(), "10 MINUTE", `SYSTEM$STREAM_HAS_DATA('MYSTREAM')`, true, `{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}`, nil) + assertTaskWithOptions(t, task, id, "some comment", testClientHelper().Ids.WarehouseId().Name(), "10 MINUTE", `SYSTEM$STREAM_HAS_DATA('MYSTREAM')`, true, `{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}`, nil, errorIntegrationId.Name()) }) t.Run("create task: with after", func(t *testing.T) { @@ -177,7 +190,7 @@ func TestInt_Tasks(t *testing.T) { task, err := testClientHelper().Task.Show(t, id) require.NoError(t, err) - assertTaskWithOptions(t, task, id, "", "", "", "", false, "", &rootTaskId) + assertTaskWithOptions(t, task, id, "", "", "", "", false, "", &rootTaskId, "") }) t.Run("create task: with after and finalizer", func(t *testing.T) { @@ -205,6 +218,12 @@ func TestInt_Tasks(t *testing.T) { ) }) + // Tested graph + // t1 + // / \ + // root t3 + // \ / + // t2 t.Run("create dag of tasks", func(t *testing.T) { rootId := testClientHelper().Ids.RandomSchemaObjectIdentifier() root, rootCleanup := testClientHelper().Task.CreateWithRequest(t, sdk.NewCreateTaskRequest(rootId, sql).WithSchedule("10 MINUTE")) @@ -276,6 +295,12 @@ func TestInt_Tasks(t *testing.T) { require.ErrorContains(t, err, "Graph has at least one cycle containing task") }) + // Tested graph + // root1 + // \ + // t1 + // / + // root2 t.Run("create dag of tasks - multiple roots", func(t *testing.T) { root1Id := testClientHelper().Ids.RandomSchemaObjectIdentifier() root1, root1Cleanup := testClientHelper().Task.CreateWithRequest(t, sdk.NewCreateTaskRequest(root1Id, sql).WithSchedule("10 MINUTE")) @@ -344,7 +369,17 @@ func TestInt_Tasks(t *testing.T) { }) t.Run("clone task: default", func(t *testing.T) { - sourceTask, taskCleanup := testClientHelper().Task.Create(t) + rootTask, rootTaskCleanup := testClientHelper().Task.Create(t) + t.Cleanup(rootTaskCleanup) + + sourceTaskId := testClientHelper().Ids.RandomSchemaObjectIdentifier() + sourceTask, taskCleanup := testClientHelper().Task.CreateWithRequest(t, sdk.NewCreateTaskRequest(sourceTaskId, sql). + WithAfter([]sdk.SchemaObjectIdentifier{rootTask.ID()}). + WithAllowOverlappingExecution(false). + WithWarehouse(*sdk.NewCreateTaskWarehouseRequest().WithWarehouse(testClientHelper().Ids.WarehouseId())). + WithComment(random.Comment()). + WithWhen(`SYSTEM$STREAM_HAS_DATA('MYSTREAM')`), + ) t.Cleanup(taskCleanup) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() @@ -355,7 +390,16 @@ func TestInt_Tasks(t *testing.T) { task, err := client.Tasks.ShowByID(ctx, id) require.NoError(t, err) - assertTask(t, task, id, testClientHelper().Ids.WarehouseId().Name()) + assert.Equal(t, sourceTask.Definition, task.Definition) + assert.Equal(t, sourceTask.Config, task.Config) + assert.Equal(t, sourceTask.Condition, task.Condition) + assert.Equal(t, sourceTask.Warehouse, task.Warehouse) + assert.Equal(t, sourceTask.Predecessors, task.Predecessors) + assert.Equal(t, sourceTask.AllowOverlappingExecution, task.AllowOverlappingExecution) + assert.Equal(t, sourceTask.Comment, task.Comment) + assert.Equal(t, sourceTask.ErrorIntegration, task.ErrorIntegration) + assert.Equal(t, sourceTask.Schedule, task.Schedule) + assert.Equal(t, sourceTask.TaskRelations, task.TaskRelations) }) t.Run("create or alter: complete", func(t *testing.T) { @@ -432,6 +476,7 @@ func TestInt_Tasks(t *testing.T) { err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(task.ID()).WithSet(*sdk.NewTaskSetRequest(). // TODO: Cannot set warehouse due to Snowflake error // WithWarehouse(testClientHelper().Ids.WarehouseId()). + WithErrorNotificationIntegration(errorIntegrationId). WithSchedule("10 MINUTE"). WithConfig(`$${"output_dir": "/temp/test_directory/", "learning_rate": 0.1}$$`). WithAllowOverlappingExecution(true). @@ -445,6 +490,7 @@ func TestInt_Tasks(t *testing.T) { assertions.AssertThat(t, objectassert.Task(t, task.ID()). // HasWarehouse(testClientHelper().Ids.WarehouseId().Name()). + HasErrorIntegration(errorIntegrationId.Name()). HasSchedule("10 MINUTE"). HasConfig(`{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}`). HasAllowOverlappingExecution(true). @@ -452,6 +498,7 @@ func TestInt_Tasks(t *testing.T) { ) err = client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(task.ID()).WithUnset(*sdk.NewTaskUnsetRequest(). + WithErrorIntegration(true). WithWarehouse(true). WithSchedule(true). WithConfig(true). @@ -465,6 +512,7 @@ func TestInt_Tasks(t *testing.T) { require.NoError(t, err) assertions.AssertThat(t, objectassert.Task(t, task.ID()). + HasErrorIntegration(""). HasSchedule(""). HasConfig(""). HasAllowOverlappingExecution(false).