Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataset factory #1945

Merged
merged 13 commits into from
Oct 15, 2024
Merged

dataset factory #1945

merged 13 commits into from
Oct 15, 2024

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Oct 10, 2024

Description

This PR is an example implementation of a dataset factory to build datasets without a pipeline object.

Copy link

netlify bot commented Oct 10, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit d367c68
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/670d2d63ec07160008d09ce1

f" ORDER BY {c_inserted_at} DESC;"
)
return self._row_to_schema_info(query, self.schema.name)
if any_schema_name:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be compressed and also needs to be implemented for all destinations

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: is this signature ok, or do we want to add a new function for this? I'm also not sure about this "any_schema_name":.. :)

@@ -212,6 +212,47 @@ def double_items():
loads_table = pipeline._dataset()[pipeline.default_schema.loads_table_name]
loads_table.fetchall()

# check dataset factory
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need proper tests to ensure that always the newest schema actually is selected, this is just basic test code to make sure it generally works

def dataset(
destination: TDestinationReferenceArg,
dataset_name: str,
schema: Union[Schema, str, None] = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we allow a given schema or alternatively a schema name which will be loaded from the destination or no schema name which will do the autodiscovery as discussed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK cool. but as discussed we'll need to implement dataset compatible with pipeline dataset (many schemas, different database layout: we support schema separation but it is rarely used)

dlt/__init__.py Outdated Show resolved Hide resolved
@sh-rp sh-rp marked this pull request as ready for review October 10, 2024 17:11
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good! I think there's a big overlap with dataset and part of pipeline that does the same:

  • keeping destinations (also staging - you should IMO include is as optional, sometimes we'll use ie. Athena to do Iceberg but actually open staging filesystem as a data lake)
  • keeping a list of schemas on the dataset
  • initializing configs, exposing various clients

do you think it makes sense to refactor pipeline right now?

do you think we could keep a dataset instance in the pipeline and just expose some methods from it

the standalone part looks good. I'm not sure if we should go for a single schema or for many schemas dataset?

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd change the WithState interface to be more explicit and also add schema tests for the filesystem

dlt/__init__.py Outdated Show resolved Hide resolved
@@ -657,8 +659,8 @@ def __exit__(

class WithStateSync(ABC):
@abstractmethod
def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
"""Retrieves newest schema from destination storage"""
def get_stored_schema(self, any_schema_name: bool = False) -> Optional[StorageSchemaInfo]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather add a new method but it really do not fit here. this interface assumes that there's a known name of a schema.

my take would be to change signature to

get_stored_schema(self, schema_name: str = None)

if None is specified, we load the newest schema, if name is provided we load the newest schema with given name

Copy link
Collaborator Author

@sh-rp sh-rp Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I had the same idea but thought it might not be good to change the default behavior of this method. I have changed it now and updated all the places in the code and tests where it is used.

def dataset(
destination: TDestinationReferenceArg,
dataset_name: str,
schema: Union[Schema, str, None] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK cool. but as discussed we'll need to implement dataset compatible with pipeline dataset (many schemas, different database layout: we support schema separation but it is rarely used)

tests/load/test_job_client.py Show resolved Hide resolved
dataset = dlt.dataset(
destination=destination_for_dataset,
dataset_name=pipeline.dataset_name,
schema="wrong_schema_name",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we allow a new schema to be added right? that's why we do not raise when schema is not known? we'll need a better method of adding schemas to a dataset. also to sync schemas etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What actually happens here is that an empty schema is created as a standin, it does not do anything and also does not get saved anywhere. I can change that if you like, but afaik for now it should be ok.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that is OK. I just thing that all the code that interacts with destination and now is in the pipeline (ie. schema lists, schema storage etc. probably could go to Dataset at some point)

@sh-rp sh-rp changed the title WIP: dataset factory dataset factory Oct 14, 2024
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! ready for merge!

@sh-rp sh-rp merged commit bc13448 into devel Oct 15, 2024
115 of 118 checks passed
@sh-rp sh-rp deleted the feat/1943-dataset-factory branch October 15, 2024 06:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants