Skip to content

Commit

Permalink
Add fail_on_error parameter to SnowflakeOperator (#179)
Browse files Browse the repository at this point in the history
* Bugfix to raise exception on failure

* Added fail_on_error to SF Operator

* Updated SnowflakeOperator documentation
  • Loading branch information
shinga authored Nov 13, 2024
1 parent 3bfd80a commit cbc271d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
16 changes: 11 additions & 5 deletions brickflow_plugins/databricks/uc_to_snowflake_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class SnowflakeOperator:
query_string : Optional parameter with queries separeted by semicolon(;)
sql_file : Optional parameter with file path (relative to brickflow project root) to .sql file
parameters: optional parameter dictionary with key value pairs to substitute in the query
fail_on_error: Optional parameter to fail the task on error, defaults to True
"""

def __init__(
Expand All @@ -73,16 +74,19 @@ def __init__(
query_string=None,
sql_file=None,
parameters={},
fail_on_error=True,
*args,
**kwargs,
):
self.conn = None
self.cur = None
self.query = None
self.sql_file = None
self.secret_scope = secret_scope
self.log = log
self.query = query_string
self.parameters = parameters
self.fail_on_error = fail_on_error
self.sql_file = sql_file
self.brickflow_root = get_bf_project_root()

Expand Down Expand Up @@ -199,12 +203,12 @@ def get_cursor(self):
self.secret_scope
)
)
con = self.get_snowflake_connection()
self.conn = self.get_snowflake_connection()
except snowflake.connector.errors.ProgrammingError as e:
raise ValueError(
"Error {0} ({1}): {2} ({3})".format(e.errno, e.sqlstate, e.msg, e.sfqid)
)
self.cur = con.cursor()
self.cur = self.conn.cursor()

def snowflake_query_exec(self, cur, database, query_string):
"""
Expand Down Expand Up @@ -264,10 +268,12 @@ def execute(self):
# Run the query against SnowFlake
try:
self.snowflake_query_exec(self.cur, self.database, self.query)
except:
self.log.error("failed to execute")
except Exception as e:
if self.fail_on_error:
raise e
self.log.exception(f"Failed to execute")
finally:
self.cur.close()
self.conn.close()
self.log.info("Closed connection")


Expand Down
3 changes: 2 additions & 1 deletion docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def run_snowflake_queries(*args):
sf_query_run = SnowflakeOperator(
secret_scope = "your_databricks secrets scope name",
query_string = "string of queries separated by semicolon(;)",
parameters={"key1":"value1", "key2":"value2"}
parameters={"key1":"value1", "key2":"value2"},
fail_on_error=True,
)
sf_query_run.execute()
```
Expand Down
27 changes: 15 additions & 12 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -697,22 +697,24 @@ def wait_on_workflow(*args):

#### Snowflake Operator

run snowflake queries from the databricks environment
Run snowflake queries from the databricks environment

As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys
    username : user id created for connecting to snowflake for ex: sample_user
    password : password information for about user for ex: P@$$word
    account : snowflake account information, not entire url for ex: sample_enterprise
    warehouse: warehouse/cluster information that user has access for ex: sample_warehouse
    database : default database that we want to connect for ex: sample_database
    role : role to which the user has write access for ex: sample_write_role

- `username` : user id created for connecting to snowflake for ex: sample_user
- `password` : password information for about user for ex: P@$$word
- `account` : snowflake account information, not entire url for ex: sample_enterprise
- `warehouse`: warehouse/cluster information that user has access for ex: sample_warehouse
- `database` : default database that we want to connect for ex: sample_database
- `role` : role to which the user has write access for ex: sample_write_role

SnowflakeOperator can accept the following as inputs

    secret_scope (required) : databricks secret scope identifier
    query_string (required) : queries separated by semicolon
    sql_file (optional) : path to the sql file
    parameters (optional) : dictionary with variables that can be used to substitute in queries
- `secret_scope` : **(required)** databricks secret scope identifier
- `query_string` : **(required)** queries separated by semicolon
- `sql_file` : (optional) path to the sql file
- `parameters` : (optional) dictionary with variables that can be used to substitute in queries
- `fail_on_error` : (optional) bool to fail the task if there is a sql error, default is True

Operator only takes one of either query_string or sql_file needs to be passed

Expand All @@ -726,7 +728,8 @@ def run_snowflake_queries(*args):
sf_query_run = SnowflakeOperator(
secret_scope = "your_databricks secrets scope name",
query_string ="select * from database.$schema.$table where $filter_condition1; select * from sample_schema.test_table",
parameters = {"schema":"test_schema","table":"sample_table","filter_condition":"col='something'"}
parameters = {"schema":"test_schema","table":"sample_table","filter_condition":"col='something'"},
fail_on_error=True,
)
sf_query_run.execute()

Expand Down

0 comments on commit cbc271d

Please sign in to comment.