From cbc271d4d5a76c5f762e7e8a12d14d9af01e07f3 Mon Sep 17 00:00:00 2001 From: Arthur Shing Date: Wed, 13 Nov 2024 12:04:13 -0800 Subject: [PATCH] Add fail_on_error parameter to SnowflakeOperator (#179) * Bugfix to raise exception on failure * Added fail_on_error to SF Operator * Updated SnowflakeOperator documentation --- .../databricks/uc_to_snowflake_operator.py | 16 +++++++---- docs/faq/faq.md | 3 ++- docs/tasks.md | 27 ++++++++++--------- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/brickflow_plugins/databricks/uc_to_snowflake_operator.py b/brickflow_plugins/databricks/uc_to_snowflake_operator.py index 8046ca1..d174ff4 100644 --- a/brickflow_plugins/databricks/uc_to_snowflake_operator.py +++ b/brickflow_plugins/databricks/uc_to_snowflake_operator.py @@ -65,6 +65,7 @@ class SnowflakeOperator: query_string : Optional parameter with queries separeted by semicolon(;) sql_file : Optional parameter with file path (relative to brickflow project root) to .sql file parameters: optional parameter dictionary with key value pairs to substitute in the query + fail_on_error: Optional parameter to fail the task on error, defaults to True """ def __init__( @@ -73,9 +74,11 @@ def __init__( query_string=None, sql_file=None, parameters={}, + fail_on_error=True, *args, **kwargs, ): + self.conn = None self.cur = None self.query = None self.sql_file = None @@ -83,6 +86,7 @@ def __init__( self.log = log self.query = query_string self.parameters = parameters + self.fail_on_error = fail_on_error self.sql_file = sql_file self.brickflow_root = get_bf_project_root() @@ -199,12 +203,12 @@ def get_cursor(self): self.secret_scope ) ) - con = self.get_snowflake_connection() + self.conn = self.get_snowflake_connection() except snowflake.connector.errors.ProgrammingError as e: raise ValueError( "Error {0} ({1}): {2} ({3})".format(e.errno, e.sqlstate, e.msg, e.sfqid) ) - self.cur = con.cursor() + self.cur = self.conn.cursor() def snowflake_query_exec(self, cur, database, query_string): """ @@ -264,10 +268,12 @@ def execute(self): # Run the query against SnowFlake try: self.snowflake_query_exec(self.cur, self.database, self.query) - except: - self.log.error("failed to execute") + except Exception as e: + if self.fail_on_error: + raise e + self.log.exception(f"Failed to execute") finally: - self.cur.close() + self.conn.close() self.log.info("Closed connection") diff --git a/docs/faq/faq.md b/docs/faq/faq.md index 2787ee6..6ba813e 100644 --- a/docs/faq/faq.md +++ b/docs/faq/faq.md @@ -96,7 +96,8 @@ def run_snowflake_queries(*args): sf_query_run = SnowflakeOperator( secret_scope = "your_databricks secrets scope name", query_string = "string of queries separated by semicolon(;)", - parameters={"key1":"value1", "key2":"value2"} + parameters={"key1":"value1", "key2":"value2"}, + fail_on_error=True, ) sf_query_run.execute() ``` diff --git a/docs/tasks.md b/docs/tasks.md index 2cbeb4d..4b24f0e 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -697,22 +697,24 @@ def wait_on_workflow(*args): #### Snowflake Operator -run snowflake queries from the databricks environment +Run snowflake queries from the databricks environment As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys -    username : user id created for connecting to snowflake for ex: sample_user -    password : password information for about user for ex: P@$$word -    account : snowflake account information, not entire url for ex: sample_enterprise -    warehouse: warehouse/cluster information that user has access for ex: sample_warehouse -    database : default database that we want to connect for ex: sample_database -    role : role to which the user has write access for ex: sample_write_role + +- `username` : user id created for connecting to snowflake for ex: sample_user +- `password` : password information for about user for ex: P@$$word +- `account` : snowflake account information, not entire url for ex: sample_enterprise +- `warehouse`: warehouse/cluster information that user has access for ex: sample_warehouse +- `database` : default database that we want to connect for ex: sample_database +- `role` : role to which the user has write access for ex: sample_write_role SnowflakeOperator can accept the following as inputs -    secret_scope (required) : databricks secret scope identifier -    query_string (required) : queries separated by semicolon -    sql_file (optional) : path to the sql file -    parameters (optional) : dictionary with variables that can be used to substitute in queries +- `secret_scope` : **(required)** databricks secret scope identifier +- `query_string` : **(required)** queries separated by semicolon +- `sql_file` : (optional) path to the sql file +- `parameters` : (optional) dictionary with variables that can be used to substitute in queries +- `fail_on_error` : (optional) bool to fail the task if there is a sql error, default is True Operator only takes one of either query_string or sql_file needs to be passed @@ -726,7 +728,8 @@ def run_snowflake_queries(*args): sf_query_run = SnowflakeOperator( secret_scope = "your_databricks secrets scope name", query_string ="select * from database.$schema.$table where $filter_condition1; select * from sample_schema.test_table", - parameters = {"schema":"test_schema","table":"sample_table","filter_condition":"col='something'"} + parameters = {"schema":"test_schema","table":"sample_table","filter_condition":"col='something'"}, + fail_on_error=True, ) sf_query_run.execute()