Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] add 'force_batch_from_source=true' to batch right from the start to allow handling large source and large target tables #697

Open
1 task done
pransito opened this issue Aug 5, 2024 · 14 comments
Labels
enhancement New feature or request

Comments

@pransito
Copy link

pransito commented Aug 5, 2024

Is this your first time submitting a feature request?

  • I have searched the existing issues, and I could not find an existing issue for this feature

Describe the feature

When one has a very large source table and one has a query on that source table that leads to a large target table then one can run into query timeout problems on Athena.

force_batch=true is good for handling many partitions and not run into the limit of 100 open partitions. However, in force_batch=true the first thing that happens is that the model gets written without partitions into a temp table. If that table is very big, that can time out, because that can be a query that means copying in one go many many GB of data.

In that case (or in fact, always) the partitions should be leveraged right from the get go. I.e. each partition should be used right from the start and written into the target (staging) table (using threading).

I believe THIS CODE must be adapted.

I am not sure if force_batch=true should be reworked to show the above behavior always or if a new flag force_batch_from_source=true should be introduced.

Describe alternatives you've considered

No response

Who will this benefit?

This will benefit people that work with very large partitioned Athena tables (100GB plus) that they want to write to another Athena table that can have on top many partitions.

Are you interested in contributing this feature?

Yes.

Anything else?

No response

@pransito pransito added the enhancement New feature or request label Aug 5, 2024
@nicor88
Copy link
Contributor

nicor88 commented Aug 5, 2024

To implement what you suggested, the partitions you are dealing with must be known beforehand, making the implementation hard, hence the solution proposed by the adapter.
The partitions cannot be known beforehand without running the query provided by the user, but if you look at some materialization like likeinsert_by_period, see here the partition is sort of know beforehand allowing a partition iteration/filter in the base user SQL query, therefore I suggest to look into that instead of changing the default adapter behavior.

Using force_batch_from_source might work only when dealing with one table, and might not suit all the use cases, where for example a big table is joined with another smal table (e.g. for enrichment) - and again, I believe that something like insert_by_period might be better if you are dealing with date partitions.

Edit: if we plan to introduce force_batch_from_source (to deal with not partition dates) I suggest to make it as a dict: force_batch_from_source={'enabled': true, 'source_iteration_columns': ['product_id'], batch_size: '30'}
pretty much we could consider to use a dictionary to specifcy how the iteration on those partitions looks like

@pransito
Copy link
Author

pransito commented Aug 6, 2024

@nicor88 Thank you for your comment!

mmmh I wrote the below code to override one of the default macros (create_table_as_with_partitions). And the partitions I know by running the SQL that is supposed to be materialized (with LIMIT 0), which should be less heavy then creating the full table in a temp location, no (get_columns_from_sql)? see here:

{% macro get_columns_from_sql(compiled_sql) %}
    {%- set limited_sql = "WITH subquery AS (" ~ compiled_sql ~ ") SELECT * FROM subquery LIMIT 0" -%}
    {%- set result = run_query(limited_sql) -%}

    {%- if execute %}
        {%- set column_names = [] -%}
        {%- for column in result.columns -%}
            {%- do column_names.append(column.name) -%}
        {%- endfor -%}
        {{ return(column_names) }}
    {%- endif -%}
{% endmacro %}



{% macro create_table_as_with_partitions(temporary, relation, compiled_code, language='sql') -%}

    -- this will override the built-in macro

    {%- do log('!!! HELLO NEW MACRO FOR BATCHING FROM SOURCE !!!: ' ~ relation) -%}

    {% set partitions_batches = get_partition_batches(sql=compiled_code, as_subquery=True) %}
    {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}

    {%- set dest_columns = get_columns_from_sql(compiled_code) -%}
    {% do log('COLUMNS ARE: ' ~ dest_columns | join(', ')) %}
    {%- set dest_cols_csv = dest_columns | join(', ') -%}
    {% do log('dest_cols_csv is: ' ~ dest_cols_csv) %}

    {%- for batch in partitions_batches -%}
        {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}

        {%- if loop.index == 1 -%}
            {%- set create_target_relation_sql -%}
                select {{ dest_cols_csv }}
                from ({{ compiled_code }})
                where {{ batch }}
            {%- endset -%}
            {%- do run_query(create_table_as(temporary, relation, create_target_relation_sql, language)) -%}
        {%- else -%}
            {%- set insert_batch_partitions_sql -%}
                insert into {{ relation }} ({{ dest_cols_csv }})
                select {{ dest_cols_csv }}
                from ({{ compiled_code }})
                where {{ batch }}
            {%- endset -%}

            {%- do run_query(insert_batch_partitions_sql) -%}
        {%- endif -%}


    {%- endfor -%}

    select 'SUCCESSFULLY CREATED TABLE {{ relation }}'

{%- endmacro %}

What do you think?

Note: what's not nice of this approach is that it is a for loop. Threads are not used. A threaded materialization where all insert into's run in parallel would be nicer. However, the single-threadedness is also the case in the original implementation of create_table_as_with_partitions, correct?

@nicor88
Copy link
Contributor

nicor88 commented Aug 6, 2024

The implementation that you suggested was similar to the original implementation. Then we decided to persist the data in a not partitioned table, and then use that table as a source for the inserts statements run afterwards.

The problem with the solution that you proposed is that the cost will be really high if the batch filter is executed on not partitioned column, causing the same query to run multiple time over and over, scanning the same bytes multiple times. Of course if the same partition used in the partition_by of the target table is the same used in the source table that's different, the partition pruning will work, and less bytes will be scanned.

Said so, we can allow a behavior similar to what you proposed via another flag, but if that's the case I would like to have also control over the name of the source partition column used for filtering - pretty much what was suggested in my message above. NOTE: the current behavior of the materialization MUST be kept for the reason explained above, if a user mistakenly use not partitions columns from the source table that causes high cost.

Regarding the multithreading, I discourage from using multiple inserts concurrently due to Athena limitations on amount of queries per seconds, therefore the current behavior/and the one that we have in the macro can be kept.

@pransito
Copy link
Author

pransito commented Aug 6, 2024

@nicor88 thank you for your response. I understand!

In our case the source table is huge so copying it first to an unpartitioned table (even if filtered to just a few days of data) is not possible. Further we work with a partition projected table as the source which (in most cases) enforces mentioning the partitioning columns in the where clause (and we do). We use a macro which always fill the base filter on the partition projected partitions. But yeah if one could somehow limit the usage of force_batch_from_source=true to only cases where a proper base filtering on partition columns is always done, then it would be safer. Is that what you meant?

Othwerise, insert_by_period is an option, I guess.

@nicor88
Copy link
Contributor

nicor88 commented Aug 6, 2024

What I mean is that the

select {{ dest_cols_csv }}
from ({{ compiled_code }})
where {{ batch }}

reduce the byte scanned if the {{batch}} filter condition acts on the source table partitions. Said so, the {{batch}} filter condition to be effective must be aware of the partitions in the source, and everything gets complicated if the compiled code contains join across tables.

Overall I'm fine to add force_batch_from_source but with this conditions:

  • we need to be sure that we know how to loop from the source table (ideally only one), therefore partitions_batches = get_partition_batches, must be re-worked
  • force_batch_from_source=false keep the orignal adapter behvior that we have at the moment.

Are you up to an alligment call to discuss the above? Looks like you are in Germany like me, we could find a slot. Feel free to reach me out in dbt slack channel to allign on this. Thanks.

@CommonCrisis
Copy link
Contributor

btw maybe this helps: https://github.com/CommonCrisis/insert-by-period-dbt-athena/tree/main

We are using the insert by period macro cause our source tables are really really big and we need to split the sources for each incremental run.

This macro adds custom filter values to further reduce the temp. table created for the merge into the target. We have the limitation of 30 min Athena runs and processing one day can take quite a while.

Idk if it helps processing your source tables - for us it is working like a charm for months now.

@jessedobbelaere
Copy link
Contributor

Nice to see you added Iceberg support @CommonCrisis 👌 I wonder if there's a future in the dbt-athena project for that macro or if we should keep it third-party. It has helped us a lot as well to process big source tables that otherwise timeout on initial run

@CommonCrisis
Copy link
Contributor

Nice to see you added Iceberg support @CommonCrisis 👌 I wonder if there's a future in the dbt-athena project for that macro or if we should keep it third-party. It has helped us a lot as well to process big source tables that otherwise timeout on initial run

I guess third-party and as dbt package would make sense. Like dbt-utils but for athena related macros

@nicor88
Copy link
Contributor

nicor88 commented Aug 12, 2024

I like the idea of another package that might include the insert-by-period macros/utilities.
In the past we had quite some users using what @jessedobbelaere proposed.

I'm wondering where shall we put such utility package and how can we call it?

@CommonCrisis
Copy link
Contributor

Why not simply dbt-athena-utils in https://github.com/dbt-athena - everybody knows dbt-utils so everybody would know what to expect

@nicor88
Copy link
Contributor

nicor88 commented Aug 12, 2024

We have https://github.com/dbt-athena/athena-utils that is the main reference for athena-utils to be used within dbt. How about that?

@CommonCrisis
Copy link
Contributor

CommonCrisis commented Aug 12, 2024

sure why not. We can add add dbt_custom_materializations folder there.

But ideally we also make the adapter more flexible. E.g. I had to copy paste the build iceberg logic into a separate file to make it work with the insert_by_period macro.

@nicor88
Copy link
Contributor

nicor88 commented Aug 12, 2024

@CommonCrisis absolutely, we can refactor what we need in the adapter to avoid overwriting as much as we can - I will have a look at your https://github.com/CommonCrisis/insert-by-period-dbt-athena/tree/main to check the implementation.

@jessedobbelaere
Copy link
Contributor

💡 Last night, dbt made this announcement about a new incremental-strategy for time-series event data. It introduces incremental_strategy='microbatch' to enable micro-batching to replace the custom insert-by-period macro's in the wild. It also seems to simplify things a lot, less config and including sampling for dev runs. Let's focus on integrating with this feature when it comes out?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants