From 1afd17b5ee8f1ad7741dbe17df9ec453b946073e Mon Sep 17 00:00:00 2001 From: Anton Parfenyuk Date: Thu, 23 Jan 2025 20:35:35 -0500 Subject: [PATCH] feat: add airflow operator and hook for ClickHouse --- .../schedulers/airflow/hooks/clickhouse.py | 37 +++++++++++++++++++ .../airflow/operators/clickhouse.py | 32 ++++++++++++++++ sqlmesh/schedulers/airflow/util.py | 5 +++ 3 files changed, 74 insertions(+) create mode 100644 sqlmesh/schedulers/airflow/hooks/clickhouse.py create mode 100644 sqlmesh/schedulers/airflow/operators/clickhouse.py diff --git a/sqlmesh/schedulers/airflow/hooks/clickhouse.py b/sqlmesh/schedulers/airflow/hooks/clickhouse.py new file mode 100644 index 000000000..b91886e7c --- /dev/null +++ b/sqlmesh/schedulers/airflow/hooks/clickhouse.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import typing as t + +import clickhouse_connect +from airflow.providers.common.sql.hooks.sql import DbApiHook +import clickhouse_connect.dbapi as clickhouse_dbapi +from clickhouse_connect.dbapi import connect # type: ignore +from clickhouse_connect.dbapi.connection import Connection +from clickhouse_connect.driver import httputil # type: ignore +import clickhouse_connect.driver +from clickhouse_connect.driver import client + + +class SQLMeshClickHouseHook(DbApiHook): + """ + Uses the ClickHouse Python DB API connector. + """ + + conn_name_attr = "sqlmesh_clickhouse_conn_id" + default_conn_name = "sqlmesh_clickhouse_default" + conn_type = "sqlmesh_clickhouse" + hook_name = "SQLMesh ClickHouse" + connector = clickhouse_connect + + def get_conn(self) -> Connection: + """Returns a Redshift connection object""" + db = self.get_connection(getattr(self, t.cast(str, self.conn_name_attr))) + + return connect( + host=db.host, + port=db.port, + user=db.login, + password=db.password, + database=db.schema, + **db.extra_dejson, + ) diff --git a/sqlmesh/schedulers/airflow/operators/clickhouse.py b/sqlmesh/schedulers/airflow/operators/clickhouse.py new file mode 100644 index 000000000..c9b3301ad --- /dev/null +++ b/sqlmesh/schedulers/airflow/operators/clickhouse.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import typing as t + + +from sqlmesh.schedulers.airflow.hooks.clickhouse import SQLMeshClickHouseHook +from sqlmesh.schedulers.airflow.operators.base import BaseDbApiOperator +from sqlmesh.schedulers.airflow.operators.targets import BaseTarget + + +class SQLMeshClickHouseOperator(BaseDbApiOperator): + """The operator that evaluates a SQLMesh model snapshot on a ClickHouse target + + Args: + target: The target that will be executed by this operator instance. + postgres_conn_id: The Airflow connection id for the postgres target. + """ + + def __init__( + self, + *, + target: BaseTarget, + clickhouse_conn_id: str = SQLMeshClickHouseHook.default_conn_name, + **kwargs: t.Any, + ) -> None: + super().__init__( + target=target, + conn_id=clickhouse_conn_id, + dialect="clickhouse", + hook_type=SQLMeshClickHouseHook, + **kwargs, + ) diff --git a/sqlmesh/schedulers/airflow/util.py b/sqlmesh/schedulers/airflow/util.py index 1eeb9e620..e3da221a1 100644 --- a/sqlmesh/schedulers/airflow/util.py +++ b/sqlmesh/schedulers/airflow/util.py @@ -122,6 +122,11 @@ def discover_engine_operator(name: str, sql_only: bool = False) -> t.Type[BaseOp name = name.lower() try: + if name == 'clickhouse': + from sqlmesh.schedulers.airflow.operators.clickhouse import ( + SQLMeshClickHouseOperator + ) + return SQLMeshClickHouseOperator if name == "spark": from sqlmesh.schedulers.airflow.operators.spark_submit import ( SQLMeshSparkSubmitOperator,