Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: hints are retained across different yields #2109

Open
wants to merge 3 commits into
base: devel
Choose a base branch
from

Conversation

joscha
Copy link
Contributor

@joscha joscha commented Nov 28, 2024

Description

I noticed in a real-life environment that mixing yields without markers and markers sometimes pushes data into the wrong table.
I was able to create a test case that reproduces it.

The expected outcome is 4 tables, looking like this:

things:

id
1000
2000

things_a:

id my_resource_id value
a 1000 1
a 2000 4

things_b:

id my_resource_id value
b 1000 2
b 2000 5

things_c:

id my_resource_id value
c 1000 3
c 2000 6

but instead things_c receives an additional item intended for the non-marked table things, so it looks like this:

things_c:

id my_resource_id value
c 1000 3
2000 None None
c 2000 6

Copy link

netlify bot commented Nov 28, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit e1cbe19
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/675aef9b0a130d0008d16241

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 2, 2024

It looks like the table_name of the most recent dlt mark is retained during extraction. I'll need to verify if this is the correct behavior or wether we would consider this a bug. If you explicitely mark every row, then it will work.

@joscha
Copy link
Contributor Author

joscha commented Dec 2, 2024

is retained during extraction

yes! Can you point me to the code where that happens? I searched and couldn't find it.

I'll need to verify if this is the correct behavior

I most definitely do not expect it and the examples in the documentation don't mention it either.
It also only happens on the very last insert. E.g. on Import of Y main rows with Y*X marked rows for other tables only the very last yield of Y will end up incrorrectly in the previously as X marked table.

If you explicitely mark every row, then it will work.

It doesn't actually, I believe. I have tried this as the first attempt and I remember it not working, however figuring out where that row came from drove me a bit nuts, so I tried a lot of things and not very methodical. I will add a second test case to this pull request with an explicit mark for the parent resource.

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 2, 2024

If you replace yield my_resource with:

            yield dlt.mark.with_hints(
                    item=my_resource,
                    hints=dlt.mark.make_hints(
                        table_name="things",
                    )
                )

you will get the expected result.

For where this problem originates this will be in the normalizer or the pipe iterator. If you can have a stab at it. We are currently very busy but should come back to this shortly, but you can always ask for help here :)

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 2, 2024

PS: Yes, we consider this a bug.

@sh-rp sh-rp added the bug Something isn't working label Dec 2, 2024
@joscha
Copy link
Contributor Author

joscha commented Dec 2, 2024

you will get the expected result.

just added a second parameter to the test to make sure it works as intended: 3cff1a3 - it does, so \o/

this will be in the normalizer or the pipe iterator.

OK, will have a short look if I can find it.

@joscha
Copy link
Contributor Author

joscha commented Dec 2, 2024

things {'id': 1000}
things_a {'my_resource_id': 1000, 'id': 'a', 'value': 1}
things_b {'my_resource_id': 1000, 'id': 'b', 'value': 2}
things_c {'my_resource_id': 1000, 'id': 'c', 'value': 3}
None
things_c {'id': 2000} <---- HERE IS THE ISSUE, this should be `things`
things_a {'my_resource_id': 2000, 'id': 'a', 'value': 4}
things_b {'my_resource_id': 2000, 'id': 'b', 'value': 5}
things_c {'my_resource_id': 2000, 'id': 'c', 'value': 6}

via

diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py
index 41d3035a..c62fa44e 100644
--- a/dlt/extract/extractors.py
+++ b/dlt/extract/extractors.py
@@ -135,6 +135,7 @@ class Extractor:
 
         if table_name := self._get_static_table_name(resource, meta):
             # write item belonging to table with static name
+            print(table_name, items)
             self._write_to_static_table(resource, table_name, items, meta)
         else:
             # table has name or other hints depending on data items
@@ -148,8 +149,10 @@ class Extractor:

shows that in subsequent runs after setting a hint on an item, the resource seems to store it.

The reason it stores it, is this call to merge_hints:

resource.merge_hints(meta.hints, meta.create_table_variant)

merge_hints calls resource.apply_hints, which then mutates the underlying resource. Any subsequent writes afterwards that do NOT bring it's own meta (e.g. table_name is empty) , will then fall back to the meta of the resource in

table_name = resource.table_name # type: ignore[assignment]
and 💥 .

I believe this would possibly not only cause issues for table names, but also for all other metadata, if any of it would divert from what's on the resource. table_name is just the most obvious one.

There are various fixes that could be done, but I am unsure if there are any backwards-compatible ones if anyone is relying on this not very sane behavior, as someone could have used this as a feature like:

yield item1
yield item2
yield dlt.mark.with_hints(item3, hints=dlt.mark.make_hints(table_name="xxx"))
yield item4
yield item5
yield item...

whoch would have yielded 2 items into the table defined in the resource and then all other subsequent items into table xxx.

Generally, I think I'd make sure that yielding an item does not at all overwrite the resource, e.g. we need to get rid of the call to apply_hints, but it feels like such a big bag of options that if we were to change this behaviour we're almost certain to break something. Alternatively we could only remove the overwrite for the table_name, which would make sure that items land in the correct table, but any write_dispositions, etc. would still leak on the global resource state, which seems wrong. If the data yielded in a resource is non-deterministic, you'd end up with a different set of tables & items, merged/replaced, different metadata on the table, different columns, etc. each run - surely that would not be expected.

@joscha
Copy link
Contributor Author

joscha commented Dec 2, 2024

Something like this also fails:

@dlt.transformer(
    table_name=lambda item: "xxxx"
)
def xxxx() -> Iterable[TDataItem]:
    yield {"id": 1}
    yield dlt.mark.with_hints(
        item={"id": 1000},
        hints=dlt.mark.make_hints(
            table_name="yyyy",
            write_disposition="merge",
            primary_key="id",
        ),
    )

with:

A set of table hints provided to the resource is inconsistent: Table name yyyy must be a function if any other table hint is a function

for the same reason - it tries to merge the hitns for the sub-table (yyyy) into the main resource and then 💥 because the main resource has a dynamic table name.
Workaround is table_name=lambda item: "yyyy" but that's besides the point; it's the same merge that causes the issue.
I found this whilst trying to make the main table have dynamic columns. Unless I mark all columns in all hinted tables as dynamic as well, it won't allow it.

@sh-rp sh-rp self-assigned this Dec 5, 2024
@sh-rp
Copy link
Collaborator

sh-rp commented Dec 5, 2024

@joscha thanks for the investigation, this makes sense! I have assigned the task to myself and hope I'll get to fixing it next week. Keep an eye on the pr :)

@joscha
Copy link
Contributor Author

joscha commented Dec 9, 2024

I have assigned the task to myself

Do you have time to give me guidance towards the right solution? Then I'd give it a stab!

@joscha joscha changed the title bug: data yields into incorrect table when nesting manually bug: hints are retained across different marked yields Dec 11, 2024
@joscha
Copy link
Contributor Author

joscha commented Dec 11, 2024

I found another issue relating to columns, which is the same codepath:

@dlt.resource()
def my_resource():
    yield dlt.mark.with_hints(
                            item={"hello": "..."},
                            hints=dlt.mark.make_hints(
                                table_name="a",
                                columns={
                                    "hello": {
                                        "data_type": "text"
                                    }
                                }
                            ),
                        )
    yield dlt.mark.with_hints(
                            item={"world": "..."},
                            hints=dlt.mark.make_hints(
                                table_name="b",
                            ),
                        )

should create:

Table: a

hello
...

Table: b

world
...

but creates:

Screenshot 2024-12-11 at 2 19 05 pm

(the column definition is retained on the pipeline for subsequent runs until another hint overwrites it.

@joscha joscha changed the title bug: hints are retained across different marked yields bug: hints are retained across different yields Dec 11, 2024
@joscha
Copy link
Contributor Author

joscha commented Dec 11, 2024

@sh-rp this bug makes dlt almost unusable when trying to yield data into different tables, I'd love to work towards a fix, can I assume that we agree this is a regression and hints should not be shared across different yields at all, e.g. the pipeline state must not be tainted by whatever hint is passed with a marked yield?

@rudolfix
Copy link
Collaborator

@joscha we are a little in the area of undefined behavior. try to create table variants if you use dynamic table name (maybe we should set this automatically and for sure we should document it)

def with_hints(
    item: TDataItems, hints: TResourceHints, create_table_variant: bool = False
) -> DataItemWithMeta:
    """Marks `item` to update the resource with specified `hints`.

    Will create a separate variant of hints for a table if `name` is provided in `hints` and `create_table_variant` is set.

    Create `TResourceHints` with `make_hints`.
    Setting `table_name` will dispatch the `item` to a specified table, like `with_table_name`
    """
    return DataItemWithMeta(HintsMeta(hints, create_table_variant), item)

@rudolfix
Copy link
Collaborator

regarding the state: all the tables are generated by a single resource and will be marked as such. so they share a single state

@joscha
Copy link
Contributor Author

joscha commented Dec 12, 2024

I mark per yielded entity though, it is not clear from the API that I mutate a global or resource -level state with the way hints are currently done. If the marking API was on the resource, then maybe, however this would not do well with parallel and async generators as you could not guarantee that a state mutation on the resource will result in a yield being processed with the correct metadata. The only clean way I can see to fix this is to allow both: resource-level hints which are retained throughout a resource execution and item-level hints which are used when processing the yielded entity but do not mutate the resource state by itself.
Whether item-level hints and resource hints are merged or not is a question of how we define it. For better backwards compatibility they would probably be merged, although it doesn't make immediate logical sense if I were to architect this feature from scratch.

@rudolfix
Copy link
Collaborator

@joscha what happens if you do

@dlt.resource()
def my_resource():
    yield dlt.mark.with_hints(
                            item={"hello": "..."},
                            hints=dlt.mark.make_hints(
                                table_name="a",
                                create_table_variant=true,
                                columns={
                                    "hello": {
                                        "data_type": "text"
                                    }
                                }
                            ),
                        )
    yield dlt.mark.with_hints(
                            item={"world": "..."},
                            hints=dlt.mark.make_hints(
                                table_name="b",
                                create_table_variant=true,
                            ),
                        )

are you getting tables in desired shape? I think if we always create a variant when table_name and any other hint is specified, then most of your confusion is (hopefully) gone.

state (ie. incremental) is managed on the resource level. table names are just hints and data routing during loading... for users that want to maintain state per table: the resource state dictionary is available for any custom work.

note that refreshing multi table resource drops all tables produced by it

@joscha
Copy link
Contributor Author

joscha commented Dec 12, 2024

@joscha what happens if you do

Still running; So far the only thing that I see is a constant stream of:

2024-12-12 13:18:11,692|[WARNING]|72231|8212723776|dlt|hints.py|_set_hints:389|A data item validator was created from column schema in companies for a table `<some_table>` variant. Currently such validator is ignored.

with <some_table> being one of the names of my dynamic tables.

Will report back once the run is done.

Doesn't seem to be an easy fix, the run failed with:

<class 'dlt.common.schema.exceptions.UnboundColumnException'>
In schema: affinity: The column id in table dropdown_options_field_1173734 did not receive any data during this load. It is marked as non-nullable merge key and it must have values. This can happen if you specify the column manually, for example using the 'merge_key', 'primary_key' or 'columns' argument but it does not exist in the data.

The diff is:

diff --git a/sources/affinity/__init__.py b/sources/affinity/__init__.py
index a76ea177..7d9c9747 100644
--- a/sources/affinity/__init__.py
+++ b/sources/affinity/__init__.py
@@ -65,8 +65,7 @@ def __create_id_resource(entity: ENTITY | LISTS_LITERAL, is_id_generator: bool =
     @dlt.resource(
         write_disposition="replace",
         primary_key="id",
-        # can't use this yet, due to https://github.com/dlt-hub/dlt/pull/2109
-        # columns=datacls,
+        columns=datacls,
         name=name,
         parallelized=True
     )
@@ -121,18 +120,9 @@ def mark_dropdown_item(dropdown_item: Dropdown | RankedDropdown, field: FieldMod
                                 write_disposition="merge",
                                 primary_key="id",
                                 merge_key="id",
-                                # can't use this yet, due to https://github.com/dlt-hub/dlt/pull/2109
-                                # columns={
-                                #     "id": {
-                                #         "primary_key": True,
-                                #         "unique": True,
-
-                                #     },
-                                #     "text": {
-                                #         "data_type": "text"
-                                #     }
-                                # }
+                                columns=type(dropdown_item)
                             ),
+                            create_table_variant=True,
                         )
 
 
@@ -147,7 +137,8 @@ def process_and_yield_fields(entity: Company | Person | OpportunityWithFields, r
                 write_disposition="merge",
                 primary_key="id",
                 merge_key="id",
-            )
+            ),
+            create_table_variant=True,
         )
         new_column = f"{field.id}_{field.name}" if field.id.startswith("field-") else field.id
         value = field.value.root
@@ -181,6 +172,7 @@ def process_and_yield_fields(entity: Company | Person | OpportunityWithFields, r
                         write_disposition="merge",
                         primary_key="id",
                     ),
+                    create_table_variant=True,
                 )
             case PersonValue() | CompanyValue():
                 ret[new_column] = value.data.id if value.data else None

a run without this diff succeeds.
I added create_table_variant=True to all calls to dlt.mark.with_hints and reenabled the columns (which I had to disable, due to different columns getting added to random tables).
The whole code is here: dlt-hub/verified-sources@master...joscha:verified-sources:joscha/affinity-source - there is some noise in that diff though, as it is WIP, but the code we're talking about is here: https://github.com/joscha/verified-sources/blob/e6fb832dba5db983e57b2d780852d2c73f7886da/sources/affinity/__init__.py

@joscha
Copy link
Contributor Author

joscha commented Dec 12, 2024

@joscha what happens if you do
[...]
create_table_variant=True,

I added another parametrized test in e1cbe19.
Result is:

tests/pipeline/test_pipeline.py::test_columns_do_not_linger[True] {'a': [{'hello': '...'}], 'b': [{'world': '...'}]}
PASSED
tests/pipeline/test_pipeline.py::test_columns_do_not_linger[False] {'a': [{'hello': '...'}], 'b': [{'hello': None, 'world': '...'}]}
FAILED

so create_table_variant=True seems to alleviate the issue with the columns at the very least.

@joscha
Copy link
Contributor Author

joscha commented Dec 15, 2024

I've been running this a while with table variants and I believe it is working as expected.
We definitely need to make

A data item validator was created from column schema in companies for a table <some_table> variant. Currently such validator is ignored.

configurable, so it's usually suppressed.

Update: see example here: https://github.com/planet-a-ventures/dlt-source-affinity/blob/1f5ec2b7cf39d8a59f46149899d8818a50e0c123/affinity/__init__.py#L33-L45 on how to hide this message when using variants with Pydantic column definitions.

Also, I still think a fix is needed for the standard case. Or at the very very least extensive documentation both markdown and code around the current behavior.

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 18, 2024

Hey @joscha, to summarize what the current state of this discussion is.

  • We need to make sure that warnings don't pile up over and over again (you can create a ticket for this)
  • We need to document the table variants stuff (this could also go into a ticket with an example of this case)

Is there anything else?

@joscha
Copy link
Contributor Author

joscha commented Dec 18, 2024

Is there anything else?

Only one extra item comes to mind: if we agree that yielding items with hints without create_table_variant=True mutates the resource state is not a bug, we should probably also add a couple tests in regards to it that assert this explicitly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants