Skip to content

Commit

Permalink
Add error_integration for PIPE
Browse files Browse the repository at this point in the history
  • Loading branch information
littleK0i committed Jan 6, 2024
1 parent 5faa184 commit 3bf23a4
Show file tree
Hide file tree
Showing 23 changed files with 177 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## [0.22.1] - 2024-01-06

- Added `error_notification` for `PIPE`.
- Added tests for `PIPE`.

## [0.22.0] - 2023-11-23

- Introduced `NETWORK RULE`, `SECRET`, `EXTERNAL ACCESS INTEGRATION` object types.
Expand Down
1 change: 1 addition & 0 deletions snowddl/blueprint/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class PipeBlueprint(SchemaObjectBlueprint):
copy_options: Optional[Dict[str, Union[bool, float, int, str, list]]] = None
aws_sns_topic: Optional[str] = None
integration: Optional[Ident] = None
error_integration: Optional[Ident] = None


class PrimaryKeyBlueprint(SchemaObjectBlueprint):
Expand Down
13 changes: 10 additions & 3 deletions snowddl/parser/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
"integration": {
"type": "string"
},
"error_integration": {
"type": "string"
},
"comment": {
"type": "string"
}
Expand All @@ -66,6 +69,11 @@ def load_blueprints(self):
def process_pipe(self, f: ParsedFile):
copy = f.params["copy"]

if copy.get("file_format"):
file_format = build_schema_object_ident(self.env_prefix, copy.get("file_format"), f.database, f.schema)
else:
file_format = None

bp = PipeBlueprint(
full_name=SchemaObjectIdent(self.env_prefix, f.database, f.schema, f.name),
auto_ingest=f.params["auto_ingest"],
Expand All @@ -74,12 +82,11 @@ def process_pipe(self, f: ParsedFile):
copy_path=copy.get("path"),
copy_pattern=copy.get("pattern"),
copy_transform=self.normalise_params_dict(copy.get("transform")),
copy_file_format=build_schema_object_ident(self.env_prefix, copy.get("file_format"), f.database, f.schema)
if copy.get("file_format")
else None,
copy_file_format=file_format,
copy_options=self.normalise_params_dict(copy.get("options")),
aws_sns_topic=f.params.get("aws_sns_topic"),
integration=Ident(f.params["integration"]) if f.params.get("integration") else None,
error_integration=Ident(f.params["error_integration"]) if f.params.get("error_integration") else None,
comment=f.params.get("comment"),
)

Expand Down
8 changes: 8 additions & 0 deletions snowddl/resolver/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ def _build_common_pipe_sql(self, bp: PipeBlueprint):
},
)

if bp.error_integration:
query.append_nl(
"ERROR_INTEGRATION = {error_integration:i}",
{
"error_integration": bp.error_integration,
},
)

if bp.comment:
query.append_nl(
"COMMENT = {comment}",
Expand Down
2 changes: 1 addition & 1 deletion snowddl/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.22.0"
__version__ = "0.22.1"
3 changes: 3 additions & 0 deletions test/_config/step1/db1/sc1/file_format/pi002_ff1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
type: AVRO
format_options:
compression: ZSTD
8 changes: 8 additions & 0 deletions test/_config/step1/db1/sc1/pipe/pi001_pi1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
copy:
table: pi001_tb1
stage: pi001_st1
pattern: ".*[.]csv"

auto_ingest: false

comment: abc
16 changes: 16 additions & 0 deletions test/_config/step1/db1/sc1/pipe/pi002_pi1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
copy:
table: pi002_tb1
stage: pi002_st1

path: /abc/cde
file_format: pi002_ff1

transform:
id: "GET($1, 'id')"
name: "GET($1, 'name')"

options:
ON_ERROR: CONTINUE
ENFORCE_LENGTH: true

auto_ingest: false
2 changes: 2 additions & 0 deletions test/_config/step1/db1/sc1/stage/pi001_st1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
url: gcs://test-bucket1
storage_integration: test_storage_integration
2 changes: 2 additions & 0 deletions test/_config/step1/db1/sc1/stage/pi002_st1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
url: gcs://test-bucket1
storage_integration: test_storage_integration
3 changes: 3 additions & 0 deletions test/_config/step1/db1/sc1/table/pi001_tb1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
columns:
id: NUMBER(38,0)
name: VARCHAR(255)
3 changes: 3 additions & 0 deletions test/_config/step1/db1/sc1/table/pi002_tb1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
columns:
id: NUMBER(38,0)
name: VARCHAR(255)
3 changes: 3 additions & 0 deletions test/_config/step2/db1/sc1/file_format/pi002_ff1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
type: AVRO
format_options:
compression: ZSTD
9 changes: 9 additions & 0 deletions test/_config/step2/db1/sc1/pipe/pi001_pi1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
copy:
table: pi001_tb1
stage: pi001_st1
pattern: ".*[.]csv[.]gz"

auto_ingest: false
error_integration: test_notification_integration

comment: cde
16 changes: 16 additions & 0 deletions test/_config/step2/db1/sc1/pipe/pi002_pi1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
copy:
table: pi002_tb1
stage: pi002_st1

path: /abc/cde/fgh
file_format: pi002_ff1

transform:
id: "GET($1, 'id')"
name: "GET($1, 'another_name')"

options:
ON_ERROR: SKIP_FILE
ENFORCE_LENGTH: false

auto_ingest: false
2 changes: 2 additions & 0 deletions test/_config/step2/db1/sc1/stage/pi001_st1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
url: gcs://test-bucket1
storage_integration: test_storage_integration
2 changes: 2 additions & 0 deletions test/_config/step2/db1/sc1/stage/pi002_st1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
url: gcs://test-bucket1
storage_integration: test_storage_integration
3 changes: 3 additions & 0 deletions test/_config/step2/db1/sc1/table/pi001_tb1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
columns:
id: NUMBER(38,0)
name: VARCHAR(255)
3 changes: 3 additions & 0 deletions test/_config/step2/db1/sc1/table/pi002_tb1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
columns:
id: NUMBER(38,0)
name: VARCHAR(255)
12 changes: 12 additions & 0 deletions test/_sql/account_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,15 @@ OAUTH_ALLOWED_SCOPES = ('photo', 'offline_access')
ENABLED = TRUE;

GRANT USAGE ON INTEGRATION TEST_API_SECURITY_INTEGRATION TO ROLE SNOWDDL_ADMIN;

---

CREATE NOTIFICATION INTEGRATION TEST_NOTIFICATION_INTEGRATION
DIRECTION = OUTBOUND
TYPE = QUEUE
NOTIFICATION_PROVIDER=AWS_SNS
AWS_SNS_ROLE_ARN='arn:aws:iam::123456789012:role/my_cloud_account_role'
AWS_SNS_TOPIC_ARN='arn:aws:sns:us-east-1:123456789012:MyTopic'
ENABLED=TRUE;

GRANT USAGE ON INTEGRATION TEST_NOTIFICATION_INTEGRATION TO ROLE SNOWDDL_ADMIN;
11 changes: 11 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,17 @@ def show_file_format(self, database, schema, name):

return cur.fetchone()

def show_pipe(self, database, schema, name):
cur = self.execute(
"SHOW PIPES LIKE {format_name:lf} IN SCHEMA {schema_name:i}",
{
"schema_name": SchemaIdent(self.env_prefix, database, schema),
"format_name": Ident(name),
},
)

return cur.fetchone()

def show_user(self, name):
cur = self.execute(
"SHOW USERS LIKE {user_name:lf}",
Expand Down
28 changes: 28 additions & 0 deletions test/pipe/pi001.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
def test_step1(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi001_pi1")

assert "PI001_TB1" in pipe_show["definition"]
assert "PI001_ST1" in pipe_show["definition"]

assert pipe_show["error_integration"] is None
assert pipe_show["pattern"] == ".*[.]csv"

assert pipe_show["comment"].startswith("abc #")


def test_step2(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi001_pi1")

assert "PI001_TB1" in pipe_show["definition"]
assert "PI001_ST1" in pipe_show["definition"]

assert pipe_show["error_integration"] == "TEST_NOTIFICATION_INTEGRATION"
assert pipe_show["pattern"] == ".*[.]csv[.]gz"

assert pipe_show["comment"].startswith("cde #")


def test_step3(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi001_pi1")

assert pipe_show is None
26 changes: 26 additions & 0 deletions test/pipe/pi002.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
def test_step1(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi002_pi1")

assert "PI002_TB1" in pipe_show["definition"]
assert "PI002_ST1" in pipe_show["definition"]
assert "PI002_FF1" in pipe_show["definition"]

assert "\"/abc/cde" in pipe_show["definition"]
assert "ENFORCE_LENGTH = TRUE" in pipe_show["definition"]


def test_step2(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi002_pi1")

assert "PI002_TB1" in pipe_show["definition"]
assert "PI002_ST1" in pipe_show["definition"]
assert "PI002_FF1" in pipe_show["definition"]

assert "\"/abc/cde/fgh" in pipe_show["definition"]
assert "ENFORCE_LENGTH = FALSE" in pipe_show["definition"]


def test_step3(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi002_pi1")

assert pipe_show is None

0 comments on commit 3bf23a4

Please sign in to comment.