From 3bf23a4daf600b537abd69bfe5ccdaa70207099e Mon Sep 17 00:00:00 2001 From: Vitaly Markov Date: Sat, 6 Jan 2024 13:18:26 +0000 Subject: [PATCH] Add error_integration for PIPE --- CHANGELOG.md | 5 ++++ snowddl/blueprint/blueprint.py | 1 + snowddl/parser/pipe.py | 13 +++++++-- snowddl/resolver/pipe.py | 8 ++++++ snowddl/version.py | 2 +- .../step1/db1/sc1/file_format/pi002_ff1.yaml | 3 ++ .../_config/step1/db1/sc1/pipe/pi001_pi1.yaml | 8 ++++++ .../_config/step1/db1/sc1/pipe/pi002_pi1.yaml | 16 +++++++++++ .../step1/db1/sc1/stage/pi001_st1.yaml | 2 ++ .../step1/db1/sc1/stage/pi002_st1.yaml | 2 ++ .../step1/db1/sc1/table/pi001_tb1.yaml | 3 ++ .../step1/db1/sc1/table/pi002_tb1.yaml | 3 ++ .../step2/db1/sc1/file_format/pi002_ff1.yaml | 3 ++ .../_config/step2/db1/sc1/pipe/pi001_pi1.yaml | 9 ++++++ .../_config/step2/db1/sc1/pipe/pi002_pi1.yaml | 16 +++++++++++ .../step2/db1/sc1/stage/pi001_st1.yaml | 2 ++ .../step2/db1/sc1/stage/pi002_st1.yaml | 2 ++ .../step2/db1/sc1/table/pi001_tb1.yaml | 3 ++ .../step2/db1/sc1/table/pi002_tb1.yaml | 3 ++ test/_sql/account_setup.sql | 12 ++++++++ test/conftest.py | 11 ++++++++ test/pipe/pi001.py | 28 +++++++++++++++++++ test/pipe/pi002.py | 26 +++++++++++++++++ 23 files changed, 177 insertions(+), 4 deletions(-) create mode 100644 test/_config/step1/db1/sc1/file_format/pi002_ff1.yaml create mode 100644 test/_config/step1/db1/sc1/pipe/pi001_pi1.yaml create mode 100644 test/_config/step1/db1/sc1/pipe/pi002_pi1.yaml create mode 100644 test/_config/step1/db1/sc1/stage/pi001_st1.yaml create mode 100644 test/_config/step1/db1/sc1/stage/pi002_st1.yaml create mode 100644 test/_config/step1/db1/sc1/table/pi001_tb1.yaml create mode 100644 test/_config/step1/db1/sc1/table/pi002_tb1.yaml create mode 100644 test/_config/step2/db1/sc1/file_format/pi002_ff1.yaml create mode 100644 test/_config/step2/db1/sc1/pipe/pi001_pi1.yaml create mode 100644 test/_config/step2/db1/sc1/pipe/pi002_pi1.yaml create mode 100644 test/_config/step2/db1/sc1/stage/pi001_st1.yaml create mode 100644 test/_config/step2/db1/sc1/stage/pi002_st1.yaml create mode 100644 test/_config/step2/db1/sc1/table/pi001_tb1.yaml create mode 100644 test/_config/step2/db1/sc1/table/pi002_tb1.yaml create mode 100644 test/pipe/pi001.py create mode 100644 test/pipe/pi002.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 16235ce..27aadd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## [0.22.1] - 2024-01-06 + +- Added `error_notification` for `PIPE`. +- Added tests for `PIPE`. + ## [0.22.0] - 2023-11-23 - Introduced `NETWORK RULE`, `SECRET`, `EXTERNAL ACCESS INTEGRATION` object types. diff --git a/snowddl/blueprint/blueprint.py b/snowddl/blueprint/blueprint.py index 5688c28..bf1e3c5 100644 --- a/snowddl/blueprint/blueprint.py +++ b/snowddl/blueprint/blueprint.py @@ -203,6 +203,7 @@ class PipeBlueprint(SchemaObjectBlueprint): copy_options: Optional[Dict[str, Union[bool, float, int, str, list]]] = None aws_sns_topic: Optional[str] = None integration: Optional[Ident] = None + error_integration: Optional[Ident] = None class PrimaryKeyBlueprint(SchemaObjectBlueprint): diff --git a/snowddl/parser/pipe.py b/snowddl/parser/pipe.py index 15afa55..03ab353 100644 --- a/snowddl/parser/pipe.py +++ b/snowddl/parser/pipe.py @@ -49,6 +49,9 @@ "integration": { "type": "string" }, + "error_integration": { + "type": "string" + }, "comment": { "type": "string" } @@ -66,6 +69,11 @@ def load_blueprints(self): def process_pipe(self, f: ParsedFile): copy = f.params["copy"] + if copy.get("file_format"): + file_format = build_schema_object_ident(self.env_prefix, copy.get("file_format"), f.database, f.schema) + else: + file_format = None + bp = PipeBlueprint( full_name=SchemaObjectIdent(self.env_prefix, f.database, f.schema, f.name), auto_ingest=f.params["auto_ingest"], @@ -74,12 +82,11 @@ def process_pipe(self, f: ParsedFile): copy_path=copy.get("path"), copy_pattern=copy.get("pattern"), copy_transform=self.normalise_params_dict(copy.get("transform")), - copy_file_format=build_schema_object_ident(self.env_prefix, copy.get("file_format"), f.database, f.schema) - if copy.get("file_format") - else None, + copy_file_format=file_format, copy_options=self.normalise_params_dict(copy.get("options")), aws_sns_topic=f.params.get("aws_sns_topic"), integration=Ident(f.params["integration"]) if f.params.get("integration") else None, + error_integration=Ident(f.params["error_integration"]) if f.params.get("error_integration") else None, comment=f.params.get("comment"), ) diff --git a/snowddl/resolver/pipe.py b/snowddl/resolver/pipe.py index 4f5db40..ed96d66 100644 --- a/snowddl/resolver/pipe.py +++ b/snowddl/resolver/pipe.py @@ -126,6 +126,14 @@ def _build_common_pipe_sql(self, bp: PipeBlueprint): }, ) + if bp.error_integration: + query.append_nl( + "ERROR_INTEGRATION = {error_integration:i}", + { + "error_integration": bp.error_integration, + }, + ) + if bp.comment: query.append_nl( "COMMENT = {comment}", diff --git a/snowddl/version.py b/snowddl/version.py index 5963297..d74a474 100644 --- a/snowddl/version.py +++ b/snowddl/version.py @@ -1 +1 @@ -__version__ = "0.22.0" +__version__ = "0.22.1" diff --git a/test/_config/step1/db1/sc1/file_format/pi002_ff1.yaml b/test/_config/step1/db1/sc1/file_format/pi002_ff1.yaml new file mode 100644 index 0000000..3bddb69 --- /dev/null +++ b/test/_config/step1/db1/sc1/file_format/pi002_ff1.yaml @@ -0,0 +1,3 @@ +type: AVRO +format_options: + compression: ZSTD diff --git a/test/_config/step1/db1/sc1/pipe/pi001_pi1.yaml b/test/_config/step1/db1/sc1/pipe/pi001_pi1.yaml new file mode 100644 index 0000000..8522527 --- /dev/null +++ b/test/_config/step1/db1/sc1/pipe/pi001_pi1.yaml @@ -0,0 +1,8 @@ +copy: + table: pi001_tb1 + stage: pi001_st1 + pattern: ".*[.]csv" + +auto_ingest: false + +comment: abc diff --git a/test/_config/step1/db1/sc1/pipe/pi002_pi1.yaml b/test/_config/step1/db1/sc1/pipe/pi002_pi1.yaml new file mode 100644 index 0000000..72fb1bd --- /dev/null +++ b/test/_config/step1/db1/sc1/pipe/pi002_pi1.yaml @@ -0,0 +1,16 @@ +copy: + table: pi002_tb1 + stage: pi002_st1 + + path: /abc/cde + file_format: pi002_ff1 + + transform: + id: "GET($1, 'id')" + name: "GET($1, 'name')" + + options: + ON_ERROR: CONTINUE + ENFORCE_LENGTH: true + +auto_ingest: false diff --git a/test/_config/step1/db1/sc1/stage/pi001_st1.yaml b/test/_config/step1/db1/sc1/stage/pi001_st1.yaml new file mode 100644 index 0000000..69cb1d9 --- /dev/null +++ b/test/_config/step1/db1/sc1/stage/pi001_st1.yaml @@ -0,0 +1,2 @@ +url: gcs://test-bucket1 +storage_integration: test_storage_integration diff --git a/test/_config/step1/db1/sc1/stage/pi002_st1.yaml b/test/_config/step1/db1/sc1/stage/pi002_st1.yaml new file mode 100644 index 0000000..69cb1d9 --- /dev/null +++ b/test/_config/step1/db1/sc1/stage/pi002_st1.yaml @@ -0,0 +1,2 @@ +url: gcs://test-bucket1 +storage_integration: test_storage_integration diff --git a/test/_config/step1/db1/sc1/table/pi001_tb1.yaml b/test/_config/step1/db1/sc1/table/pi001_tb1.yaml new file mode 100644 index 0000000..c529f34 --- /dev/null +++ b/test/_config/step1/db1/sc1/table/pi001_tb1.yaml @@ -0,0 +1,3 @@ +columns: + id: NUMBER(38,0) + name: VARCHAR(255) diff --git a/test/_config/step1/db1/sc1/table/pi002_tb1.yaml b/test/_config/step1/db1/sc1/table/pi002_tb1.yaml new file mode 100644 index 0000000..c529f34 --- /dev/null +++ b/test/_config/step1/db1/sc1/table/pi002_tb1.yaml @@ -0,0 +1,3 @@ +columns: + id: NUMBER(38,0) + name: VARCHAR(255) diff --git a/test/_config/step2/db1/sc1/file_format/pi002_ff1.yaml b/test/_config/step2/db1/sc1/file_format/pi002_ff1.yaml new file mode 100644 index 0000000..3bddb69 --- /dev/null +++ b/test/_config/step2/db1/sc1/file_format/pi002_ff1.yaml @@ -0,0 +1,3 @@ +type: AVRO +format_options: + compression: ZSTD diff --git a/test/_config/step2/db1/sc1/pipe/pi001_pi1.yaml b/test/_config/step2/db1/sc1/pipe/pi001_pi1.yaml new file mode 100644 index 0000000..5874b44 --- /dev/null +++ b/test/_config/step2/db1/sc1/pipe/pi001_pi1.yaml @@ -0,0 +1,9 @@ +copy: + table: pi001_tb1 + stage: pi001_st1 + pattern: ".*[.]csv[.]gz" + +auto_ingest: false +error_integration: test_notification_integration + +comment: cde diff --git a/test/_config/step2/db1/sc1/pipe/pi002_pi1.yaml b/test/_config/step2/db1/sc1/pipe/pi002_pi1.yaml new file mode 100644 index 0000000..9bb6e79 --- /dev/null +++ b/test/_config/step2/db1/sc1/pipe/pi002_pi1.yaml @@ -0,0 +1,16 @@ +copy: + table: pi002_tb1 + stage: pi002_st1 + + path: /abc/cde/fgh + file_format: pi002_ff1 + + transform: + id: "GET($1, 'id')" + name: "GET($1, 'another_name')" + + options: + ON_ERROR: SKIP_FILE + ENFORCE_LENGTH: false + +auto_ingest: false diff --git a/test/_config/step2/db1/sc1/stage/pi001_st1.yaml b/test/_config/step2/db1/sc1/stage/pi001_st1.yaml new file mode 100644 index 0000000..69cb1d9 --- /dev/null +++ b/test/_config/step2/db1/sc1/stage/pi001_st1.yaml @@ -0,0 +1,2 @@ +url: gcs://test-bucket1 +storage_integration: test_storage_integration diff --git a/test/_config/step2/db1/sc1/stage/pi002_st1.yaml b/test/_config/step2/db1/sc1/stage/pi002_st1.yaml new file mode 100644 index 0000000..69cb1d9 --- /dev/null +++ b/test/_config/step2/db1/sc1/stage/pi002_st1.yaml @@ -0,0 +1,2 @@ +url: gcs://test-bucket1 +storage_integration: test_storage_integration diff --git a/test/_config/step2/db1/sc1/table/pi001_tb1.yaml b/test/_config/step2/db1/sc1/table/pi001_tb1.yaml new file mode 100644 index 0000000..c529f34 --- /dev/null +++ b/test/_config/step2/db1/sc1/table/pi001_tb1.yaml @@ -0,0 +1,3 @@ +columns: + id: NUMBER(38,0) + name: VARCHAR(255) diff --git a/test/_config/step2/db1/sc1/table/pi002_tb1.yaml b/test/_config/step2/db1/sc1/table/pi002_tb1.yaml new file mode 100644 index 0000000..c529f34 --- /dev/null +++ b/test/_config/step2/db1/sc1/table/pi002_tb1.yaml @@ -0,0 +1,3 @@ +columns: + id: NUMBER(38,0) + name: VARCHAR(255) diff --git a/test/_sql/account_setup.sql b/test/_sql/account_setup.sql index f5e4013..bcc185c 100644 --- a/test/_sql/account_setup.sql +++ b/test/_sql/account_setup.sql @@ -81,3 +81,15 @@ OAUTH_ALLOWED_SCOPES = ('photo', 'offline_access') ENABLED = TRUE; GRANT USAGE ON INTEGRATION TEST_API_SECURITY_INTEGRATION TO ROLE SNOWDDL_ADMIN; + +--- + +CREATE NOTIFICATION INTEGRATION TEST_NOTIFICATION_INTEGRATION +DIRECTION = OUTBOUND +TYPE = QUEUE +NOTIFICATION_PROVIDER=AWS_SNS +AWS_SNS_ROLE_ARN='arn:aws:iam::123456789012:role/my_cloud_account_role' +AWS_SNS_TOPIC_ARN='arn:aws:sns:us-east-1:123456789012:MyTopic' +ENABLED=TRUE; + +GRANT USAGE ON INTEGRATION TEST_NOTIFICATION_INTEGRATION TO ROLE SNOWDDL_ADMIN; diff --git a/test/conftest.py b/test/conftest.py index 6e07e61..8d67927 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -222,6 +222,17 @@ def show_file_format(self, database, schema, name): return cur.fetchone() + def show_pipe(self, database, schema, name): + cur = self.execute( + "SHOW PIPES LIKE {format_name:lf} IN SCHEMA {schema_name:i}", + { + "schema_name": SchemaIdent(self.env_prefix, database, schema), + "format_name": Ident(name), + }, + ) + + return cur.fetchone() + def show_user(self, name): cur = self.execute( "SHOW USERS LIKE {user_name:lf}", diff --git a/test/pipe/pi001.py b/test/pipe/pi001.py new file mode 100644 index 0000000..d31c461 --- /dev/null +++ b/test/pipe/pi001.py @@ -0,0 +1,28 @@ +def test_step1(helper): + pipe_show = helper.show_pipe("db1", "sc1", "pi001_pi1") + + assert "PI001_TB1" in pipe_show["definition"] + assert "PI001_ST1" in pipe_show["definition"] + + assert pipe_show["error_integration"] is None + assert pipe_show["pattern"] == ".*[.]csv" + + assert pipe_show["comment"].startswith("abc #") + + +def test_step2(helper): + pipe_show = helper.show_pipe("db1", "sc1", "pi001_pi1") + + assert "PI001_TB1" in pipe_show["definition"] + assert "PI001_ST1" in pipe_show["definition"] + + assert pipe_show["error_integration"] == "TEST_NOTIFICATION_INTEGRATION" + assert pipe_show["pattern"] == ".*[.]csv[.]gz" + + assert pipe_show["comment"].startswith("cde #") + + +def test_step3(helper): + pipe_show = helper.show_pipe("db1", "sc1", "pi001_pi1") + + assert pipe_show is None diff --git a/test/pipe/pi002.py b/test/pipe/pi002.py new file mode 100644 index 0000000..efd4a9b --- /dev/null +++ b/test/pipe/pi002.py @@ -0,0 +1,26 @@ +def test_step1(helper): + pipe_show = helper.show_pipe("db1", "sc1", "pi002_pi1") + + assert "PI002_TB1" in pipe_show["definition"] + assert "PI002_ST1" in pipe_show["definition"] + assert "PI002_FF1" in pipe_show["definition"] + + assert "\"/abc/cde" in pipe_show["definition"] + assert "ENFORCE_LENGTH = TRUE" in pipe_show["definition"] + + +def test_step2(helper): + pipe_show = helper.show_pipe("db1", "sc1", "pi002_pi1") + + assert "PI002_TB1" in pipe_show["definition"] + assert "PI002_ST1" in pipe_show["definition"] + assert "PI002_FF1" in pipe_show["definition"] + + assert "\"/abc/cde/fgh" in pipe_show["definition"] + assert "ENFORCE_LENGTH = FALSE" in pipe_show["definition"] + + +def test_step3(helper): + pipe_show = helper.show_pipe("db1", "sc1", "pi002_pi1") + + assert pipe_show is None