From b2639003e3fc025d52847daaf97e0ff18edcd11a Mon Sep 17 00:00:00 2001 From: Umar Ahmed Date: Thu, 3 Oct 2024 00:30:17 +0000 Subject: [PATCH] Add engine_adapter_callback --- dlt/sources/sql_database/__init__.py | 13 ++++++++++++- dlt/sources/sql_database/helpers.py | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/dlt/sources/sql_database/__init__.py b/dlt/sources/sql_database/__init__.py index f7c83b4b80..8d1570b600 100644 --- a/dlt/sources/sql_database/__init__.py +++ b/dlt/sources/sql_database/__init__.py @@ -19,6 +19,7 @@ SqlTableResourceConfiguration, _detect_precision_hints_deprecated, TQueryAdapter, + TEngineAdapter, ) from .schema_types import ( default_table_adapter, @@ -45,6 +46,7 @@ def sql_database( include_views: bool = False, type_adapter_callback: Optional[TTypeAdapter] = None, query_adapter_callback: Optional[TQueryAdapter] = None, + engine_adapter_callback: Optional[TEngineAdapter] = None, ) -> Iterable[DltResource]: """ A dlt source which loads data from an SQL database using SQLAlchemy. @@ -90,8 +92,12 @@ def sql_database( # set up alchemy engine engine = engine_from_credentials(credentials) engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) - metadata = metadata or MetaData(schema=schema) + if engine_adapter_callback: + engine_adapter_callback(engine) + + metadata = metadata or MetaData(schema=schema) + # use provided tables or all tables if table_names: tables = [ @@ -138,6 +144,7 @@ def sql_table( type_adapter_callback: Optional[TTypeAdapter] = None, included_columns: Optional[List[str]] = None, query_adapter_callback: Optional[TQueryAdapter] = None, + engine_adapter_callback: Optional[TEngineAdapter] = None, ) -> DltResource: """ A dlt resource which loads data from an SQL database table using SQLAlchemy. @@ -182,6 +189,10 @@ def sql_table( engine = engine_from_credentials(credentials, may_dispose_after_use=True) engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) + + if engine_adapter_callback: + engine_adapter_callback(engine) + metadata = metadata or MetaData(schema=schema) table_obj = metadata.tables.get("table") or Table( diff --git a/dlt/sources/sql_database/helpers.py b/dlt/sources/sql_database/helpers.py index 1d758fe882..cd66e93953 100644 --- a/dlt/sources/sql_database/helpers.py +++ b/dlt/sources/sql_database/helpers.py @@ -37,6 +37,7 @@ TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] TQueryAdapter = Callable[[SelectAny, Table], SelectAny] +TEngineAdapter = Callable[[Engine], Engine] class TableLoader: