Skip to content

Commit

Permalink
feat: add airflow operator and hook for ClickHouse
Browse files Browse the repository at this point in the history
  • Loading branch information
Spar9a committed Jan 24, 2025
1 parent 2bb8f7f commit 1afd17b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
37 changes: 37 additions & 0 deletions sqlmesh/schedulers/airflow/hooks/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

import typing as t

import clickhouse_connect
from airflow.providers.common.sql.hooks.sql import DbApiHook
import clickhouse_connect.dbapi as clickhouse_dbapi
from clickhouse_connect.dbapi import connect # type: ignore
from clickhouse_connect.dbapi.connection import Connection
from clickhouse_connect.driver import httputil # type: ignore
import clickhouse_connect.driver
from clickhouse_connect.driver import client


class SQLMeshClickHouseHook(DbApiHook):
"""
Uses the ClickHouse Python DB API connector.
"""

conn_name_attr = "sqlmesh_clickhouse_conn_id"
default_conn_name = "sqlmesh_clickhouse_default"
conn_type = "sqlmesh_clickhouse"
hook_name = "SQLMesh ClickHouse"
connector = clickhouse_connect

def get_conn(self) -> Connection:
"""Returns a Redshift connection object"""
db = self.get_connection(getattr(self, t.cast(str, self.conn_name_attr)))

return connect(
host=db.host,
port=db.port,
user=db.login,
password=db.password,
database=db.schema,
**db.extra_dejson,
)
32 changes: 32 additions & 0 deletions sqlmesh/schedulers/airflow/operators/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

import typing as t


from sqlmesh.schedulers.airflow.hooks.clickhouse import SQLMeshClickHouseHook
from sqlmesh.schedulers.airflow.operators.base import BaseDbApiOperator
from sqlmesh.schedulers.airflow.operators.targets import BaseTarget


class SQLMeshClickHouseOperator(BaseDbApiOperator):
"""The operator that evaluates a SQLMesh model snapshot on a ClickHouse target
Args:
target: The target that will be executed by this operator instance.
postgres_conn_id: The Airflow connection id for the postgres target.
"""

def __init__(
self,
*,
target: BaseTarget,
clickhouse_conn_id: str = SQLMeshClickHouseHook.default_conn_name,
**kwargs: t.Any,
) -> None:
super().__init__(
target=target,
conn_id=clickhouse_conn_id,
dialect="clickhouse",
hook_type=SQLMeshClickHouseHook,
**kwargs,
)
5 changes: 5 additions & 0 deletions sqlmesh/schedulers/airflow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ def discover_engine_operator(name: str, sql_only: bool = False) -> t.Type[BaseOp
name = name.lower()

try:
if name == 'clickhouse':
from sqlmesh.schedulers.airflow.operators.clickhouse import (
SQLMeshClickHouseOperator
)
return SQLMeshClickHouseOperator
if name == "spark":
from sqlmesh.schedulers.airflow.operators.spark_submit import (
SQLMeshSparkSubmitOperator,
Expand Down

0 comments on commit 1afd17b

Please sign in to comment.