Skip to content

Commit

Permalink
prefers existing sections in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 26, 2024
1 parent 3916412 commit 888c665
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,17 @@ def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any:
return decorator


def with_config_section(sections: Tuple[str, ...]) -> Callable[[TFun], TFun]:
def with_config_section(
sections: Tuple[str, ...], merge_func: ConfigSectionContext.TMergeFunc = None
) -> Callable[[TFun], TFun]:
def decorator(f: TFun) -> TFun:
@wraps(f)
def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any:
# add section context to the container to be used by all configuration without explicit sections resolution
with inject_section(
ConfigSectionContext(pipeline_name=self.pipeline_name, sections=sections)
ConfigSectionContext(
pipeline_name=self.pipeline_name, sections=sections, merge_style=merge_func
)
):
return f(self, *args, **kwargs)

Expand Down Expand Up @@ -712,7 +716,7 @@ def run(
else:
return None

@with_config_section(())
@with_config_section(sections=None, merge_func=ConfigSectionContext.prefer_existing)
def sync_destination(
self,
destination: TDestinationReferenceArg = None,
Expand Down Expand Up @@ -980,7 +984,7 @@ def get_local_state_val(self, key: str) -> Any:
state = self._get_state()
return state["_local"][key] # type: ignore

@with_config_section(())
@with_config_section(sections=None, merge_func=ConfigSectionContext.prefer_existing)
def sql_client(self, schema_name: str = None) -> SqlClientBase[Any]:
"""Returns a sql client configured to query/change the destination and dataset that were used to load the data.
Use the client with `with` statement to manage opening and closing connection to the destination:
Expand Down Expand Up @@ -1020,7 +1024,7 @@ def _fs_client(self, schema_name: str = None) -> FSClientBase:
return client
raise FSClientNotAvailable(self.pipeline_name, self.destination.destination_name)

@with_config_section(())
@with_config_section(sections=None, merge_func=ConfigSectionContext.prefer_existing)
def destination_client(self, schema_name: str = None) -> JobClientBase:
"""Get the destination job client for the configured destination
Use the client with `with` statement to manage opening and closing connection to the destination:
Expand Down

0 comments on commit 888c665

Please sign in to comment.