Skip to content

Commit

Permalink
Merge pull request #159 from dlt-hub/rfix/fixes-duckdb-location
Browse files Browse the repository at this point in the history
fixes duckdb database default location
  • Loading branch information
rudolfix authored Feb 28, 2023
2 parents b43edcf + 85f2557 commit fde4642
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 63 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ experiments/*
secrets.toml
*.session.sql
*.duckdb
*.wal

# Byte-compiled / optimized / DLL files
**/__pycache__/
Expand Down
6 changes: 6 additions & 0 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class SupportsPipeline(Protocol):
def state(self) -> TPipelineState:
"""Returns dictionary with pipeline state"""

def set_local_state_val(self, key: str, value: Any) -> None:
"""Sets value in local state. Local state is not synchronized with destination."""

def get_local_state_val(self, key: str) -> Any:
"""Gets value from local state. Local state is not synchronized with destination."""

def run(
self,
data: Any = None,
Expand Down
46 changes: 38 additions & 8 deletions dlt/destinations/duckdb/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from dlt.common.destination.reference import DestinationClientDwhConfiguration
from dlt.common.typing import DictStrAny, TSecretValue

DEFAULT_DUCK_DB_NAME = "quack.duckdb"
DUCK_DB_NAME = "%s.duckdb"
DEFAULT_DUCK_DB_NAME = DUCK_DB_NAME % "quack"
LOCAL_STATE_KEY = "duckdb_database"


@configspec
Expand All @@ -25,7 +27,7 @@ class DuckDbCredentials(ConnectionStringCredentials):

# __config_gen_annotations__: ClassVar[List[str]] = ["database"]

def borrow_conn(self, read_only: bool, config: DictStrAny = None) -> Any:
def borrow_conn(self, read_only: bool) -> Any:
import duckdb

if not hasattr(self, "_conn_lock"):
Expand All @@ -34,7 +36,7 @@ def borrow_conn(self, read_only: bool, config: DictStrAny = None) -> Any:
# obtain a lock because duck releases the GIL and we have refcount concurrency
with self._conn_lock:
if not hasattr(self, "_conn"):
self._conn = duckdb.connect(database=self.database, read_only=read_only, config=config)
self._conn = duckdb.connect(database=self.database, read_only=read_only)
self._conn_owner = True
self._conn_borrows = 0

Expand Down Expand Up @@ -70,31 +72,59 @@ def parse_native_representation(self, native_value: Any) -> None:
try:
super().parse_native_representation(native_value)
except InvalidConnectionString:
if is_valid_filepath(native_value, platform="auto"):
if native_value == ":pipeline:" or is_valid_filepath(native_value, platform="auto"):
self.database = native_value
else:
raise

def on_resolved(self) -> None:
# if database is not set, try the pipeline context
if not self.database:
# do not set any paths for external database
if self.database == ":external:":
return
# try the pipeline context
if self.database == ":pipeline:":
self.database = self._path_in_pipeline(DEFAULT_DUCK_DB_NAME)
# if pipeline context was not present
# if pipeline context was not present or database was not set
if not self.database:
# create database locally
self.database = DEFAULT_DUCK_DB_NAME
self.database = self._path_from_pipeline(DEFAULT_DUCK_DB_NAME)
# always make database an abs path
self.database = os.path.abspath(self.database)
self._path_to_pipeline(self.database)

def _path_in_pipeline(self, rel_path: str) -> str:
from dlt.common.configuration.container import Container
from dlt.common.pipeline import PipelineContext

context = Container()[PipelineContext]
if context.is_active():
# pipeline is active, get the working directory
return os.path.join(context.pipeline().working_dir, rel_path)
return None


def _path_to_pipeline(self, abspath: str) -> None:
from dlt.common.configuration.container import Container
from dlt.common.pipeline import PipelineContext

context = Container()[PipelineContext]
if context.is_active():
context.pipeline().set_local_state_val(LOCAL_STATE_KEY, abspath)

def _path_from_pipeline(self, default_path: str) -> str:
from dlt.common.configuration.container import Container
from dlt.common.pipeline import PipelineContext

context = Container()[PipelineContext]
if context.is_active():
try:
# use pipeline name as default
default_path = DUCK_DB_NAME % context.pipeline().pipeline_name
return context.pipeline().get_local_state_val(LOCAL_STATE_KEY) # type: ignore
except KeyError:
pass
return default_path

def _delete_conn(self) -> None:
# print("Closing conn because is owner")
self._conn.close()
Expand Down
25 changes: 24 additions & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dlt.common.configuration import inject_section, known_sections
from dlt.common.configuration.specs import RunConfiguration, NormalizeVolumeConfiguration, SchemaVolumeConfiguration, LoadVolumeConfiguration, PoolRunnerConfiguration
from dlt.common.configuration.container import Container
from dlt.common.configuration.exceptions import ConfigFieldMissingException
from dlt.common.configuration.exceptions import ConfigFieldMissingException, ContextDefaultCannotBeCreated
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.exceptions import MissingDependencyException
from dlt.common.normalizers import default_normalizers, import_normalizers
Expand Down Expand Up @@ -573,6 +573,29 @@ def sync_schema(self, schema_name: str = None, credentials: Any = None) -> None:
client.initialize_storage()
client.update_storage_schema()


def set_local_state_val(self, key: str, value: Any) -> None:
"""Sets value in local state. Local state is not synchronized with destination."""
try:
# get managed state that is read/write
state = self._container[StateInjectableContext].state
state["_local"][key] = value # type: ignore
except ContextDefaultCannotBeCreated:
state = self._get_state()
state["_local"][key] = value # type: ignore
self._save_state(state)


def get_local_state_val(self, key: str) -> Any:
"""Gets value from local state. Local state is not synchronized with destination."""
try:
# get managed state that is read/write
state = self._container[StateInjectableContext].state
except ContextDefaultCannotBeCreated:
state = self._get_state()
return state["_local"][key] # type: ignore


def sql_client(self, schema_name: str = None, credentials: Any = None) -> SqlClientBase[Any]:
"""Returns a sql connection configured to query/change the destination and dataset that were used to load the data."""
# if not self.default_schema_name:
Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/destinations.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ python3 chess.py

### Destination Configuration

By default, a DuckDB database will be created inside the pipeline working directory with a name `quack.duckdb`. It is available in `read/write` mode via `pipeline.sql_client`.
By default, a DuckDB database will be created in the current working directory with a name `<pipeline_name>.duckdb` (`chess.duckdb` in the example above). After loading, it is available in `read/write` mode via `with pipeline.sql_client() as con:` which is a wrapper over `DuckDBPyConnection`. See [duckdb docs](https://duckdb.org/docs/api/python/overview#persistent-storage) for details.

As the `duckdb` credentials do not require any secret values, you are free to pass the configuration explicitly via the `credentials` parameter to `dlt.pipeline` or `pipeline.run` methods. For example:
The `duckdb` credentials do not require any secret values. You are free to pass the configuration explicitly via the `credentials` parameter to `dlt.pipeline` or `pipeline.run` methods. For example:
```python
# will load data to files/data.db database file
p = dlt.pipeline(pipeline_name='chess', destination='duckdb', dataset_name='chess_data', full_refresh=False, credentials="files/data.db")
Expand Down
Loading

0 comments on commit fde4642

Please sign in to comment.