Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Table Partitioning Option for PostgreSQL #168

Open
sheyd opened this issue Jul 30, 2019 · 13 comments · May be fixed by #78
Open

Add Table Partitioning Option for PostgreSQL #168

sheyd opened this issue Jul 30, 2019 · 13 comments · May be fixed by #78
Labels
wontfix This will not be worked on

Comments

@sheyd
Copy link

sheyd commented Jul 30, 2019

Feature

Feature description

Table partitioning has existed since PostgreSQL 9, but in PostgreSQL 11 has been made significantly easier to use and more performant.

Who will this benefit?

Anyone who has solid Medium-Sized™ data that doesn't quite need the complexity or resources provided by BigQuery/RedShift/Snowflake.

For instance, in our current environment we have a table for all email messages sent to every user historically that is 200 million rows. While queryable, it could definitely benefit from using a range partition based on dates or even user groups.

@drewbanin
Copy link

Thanks for the request @sheyd!

I took a look at the docs you've sent over. Table partitioning on pg looks pretty slick but also very involved.

There are a couple of challenges for us to sort out here: in dbt, users don't specify table DDL. Instead, dbt creates tables with the schema described in create table as queries generated from model select statements. It doesn't appear to me that there's a way to specify table partitioning in a create table as statement - do you know if such a thing is possible?

One approach would be to create an empty table from the specified select statement, then create a table like that other table, copying over the schema and adding a partitioning clause. In practice, this might look like:

create table dbt_dbanin.tbl_partitioning_test__dbt_tmp as (

        -- model SELECT statement (with a limit 0)
	select
		1 as id,
		'drew'::text as name,
		'green'::text as favorite_color
		
	limit 0

);

-- create the partitioned table from the __tmp table definition
create table dbt_dbanin.tbl_partitioning_test
(
	like dbt_dbanin.tbl_partitioning_test__dbt_tmp
)
partition by HASH (favorite_color);

While this might work for creating the partitioned table, it's really only a tiny part of the overall implementation. The docs here make it look to me like dbt would need to create one table per partition, specifying the range of dates in that partition. Further, there's a bunch of examples of custom functions like

CREATE RULE measurement_insert_y2006m02 AS
ON INSERT TO measurement WHERE
    ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' )
DO INSTEAD
    INSERT INTO measurement_y2006m02 VALUES (NEW.*);

or

CREATE OR REPLACE FUNCTION measurement_insert_trigger()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO measurement_y2008m01 VALUES (NEW.*);
    RETURN NULL;
END;
$$
LANGUAGE plpgsql;

There's also a discussion of triggers and this sort of wild line:

We must redefine the trigger function each month so that it always points to the current child table. The trigger definition does not need to be updated, however.

In all, this feels like a pretty cumbersome process, and one that's a little outside the realm of how dbt usually operates!

I am acutely interested in figuring out how dbt can better create date-partitioned datasets on data warehouses (like redshift / snowflake / bigquery), so I'm super happy to spend the time to explore some approaches on postgres too. As it stands though, I don't imagine this is something we'd implement in dbt-core. Maybe it's a good use for a Custom Materialization?

Let me know if I'm overlooking or overcomplicating anything, and thanks again for the suggestion!

@drewbanin drewbanin added the wontfix This will not be worked on label Aug 6, 2019
@drewbanin
Copy link

Closing this as I don't think it's something we're going to prioritize in the next 6 months. Happy to re-open if anyone feels strongly!

@joeyfezster
Copy link

joeyfezster commented Nov 1, 2021

With the recent release of Postgres 14, Postgres further solidifies itself as a great choice for sub-petabyte scale.

I would ask you to reconsider the prioritization of this, as it can come to significant cost savings for dbt-lab's clients.

Numerous performance improvements have been made for parallel queries, heavily-concurrent workloads, partitioned tables, logical replication, and vacuuming.
Improve the performance of updates and deletes on partitioned tables with many partitions (Amit Langote, Tom Lane)

  • Improve the performance of updates and deletes on partitioned tables with many partitions (Amit Langote, Tom Lane)
    This change greatly reduces the planner's overhead for such cases, and also allows updates/deletes on partitioned tables to use execution-time partition pruning.
  • Allow partitions to be detached in a non-blocking manner (Álvaro Herrera)
    The syntax is ALTER TABLE ... DETACH PARTITION ... CONCURRENTLY, and FINALIZE
  • Ignore COLLATE clauses in partition boundary values (Tom Lane)
    Previously any such clause had to match the collation of the partition key; but it's more consistent to consider that it's automatically coerced to the collation of the partition key.

Current docs

@jtcohen6
Copy link
Contributor

jtcohen6 commented Nov 3, 2021

@joeyfezster Thanks for the bump! Do you have a sense of whether the implementation required for Postgres partitions has become any simpler in the years since this issue was originally opened? A lot of the original concerns still seem relevant.

In the meantime, there's nothing blocking from someone implementing this as a custom materialization in user-space code (i.e., no fork of dbt necessary). I'd be interested to see the complexity required, before deciding whether this should live in the dbt-postgres adapter plugin and be available out of the box.

@techtangents
Copy link

techtangents commented Nov 12, 2021

Hi,

I would also value this functionality. I think postgres' ability to function effectively as a data warehouse depends on partitioning.

It seems that the simplest way to deal with partitions is to use the pg_partman extension. AWS have a good tutorial on it. https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL_Partitions.html

So, when creating the table, dbt would create the table as normal, but with a PARTITION BY clause. Then it could call partman.create_parent:

 p_control => 'created_at',
 p_type => 'native',
 p_interval=> 'daily',
 p_premake => 30);

Also, during each dbt run, before inserting any data into the table, it should run partman.run_maintenance_proc, so partman can make sure all the required partitions are created.

Would you please consider reopening this issue?

Kind regards

@techtangents
Copy link

Also - would it work if I created partitioned tables manually? Would dbt just use them as if they're normal tables?

@transistormuncher
Copy link

transistormuncher commented Oct 7, 2022

Hi everyone,
I understand the difficulties mentioned by @drewbanin. And I agree with everyone else that this would be a very valuable feature for companies that rely on Postgres as an open source solution. I would appreciate it greatly if this could be supported by dbt-core.

@jtcohen6 Thank you for the following comment:

In the meantime, there's nothing blocking from someone implementing this as a custom materialization in user-space code

Could you be so kind to point me in the right direction on how to do this? In the dbt docs I did not find anything regarding custom materializations.

Thank you

@IS-Josh
Copy link

IS-Josh commented Sep 19, 2023

Also - would it work if I created partitioned tables manually? Would dbt just use them as if they're normal tables?

I'm using AlloyDB with list partitioned tables and yes DBT can use partitioned tables on Postgres that are created prior to running DBT. We use incremental loading. You can opt for incremental and load just new rows, or if you want to replace entire partitions or data you can choose incremental and set the unique identifier to be the key you are partitioning on.

The default DBT patterns are not very optimal for replacing entire partitions of data. (large scale deletes and inserts can be slow) To combat this you may need to make adjustments to the incremental materialization to enable better partition pruning. particularly when using a composite key for your unique key.

The default incremental materialization out of the box will probably work okay for most simple use cases but you're likely to run into a few performance issues if your workload is anything but simple.

I've been tinkering with a few components/ideas that could eventually go into a new materialisation to enable hot-swapping of entire partitions for Postgres but don't really have time to build out the materialisation. Happy to share my thoughts if anyone is looking at tackling this,

@IS-Josh
Copy link

IS-Josh commented Sep 19, 2023

Thanks for the request @sheyd!

I took a look at the docs you've sent over. Table partitioning on pg looks pretty slick but also very involved.

There are a couple of challenges for us to sort out here: in dbt, users don't specify table DDL. Instead, dbt creates tables with the schema described in create table as queries generated from model select statements. It doesn't appear to me that there's a way to specify table partitioning in a create table as statement - do you know if such a thing is possible?

One approach would be to create an empty table from the specified select statement, then create a table like that other table, copying over the schema and adding a partitioning clause. In practice, this might look like:

create table dbt_dbanin.tbl_partitioning_test__dbt_tmp as (

        -- model SELECT statement (with a limit 0)
	select
		1 as id,
		'drew'::text as name,
		'green'::text as favorite_color
		
	limit 0

);

-- create the partitioned table from the __tmp table definition
create table dbt_dbanin.tbl_partitioning_test
(
	like dbt_dbanin.tbl_partitioning_test__dbt_tmp
)
partition by HASH (favorite_color);

While this might work for creating the partitioned table, it's really only a tiny part of the overall implementation. The docs here make it look to me like dbt would need to create one table per partition, specifying the range of dates in that partition. Further, there's a bunch of examples of custom functions like

CREATE RULE measurement_insert_y2006m02 AS
ON INSERT TO measurement WHERE
    ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' )
DO INSTEAD
    INSERT INTO measurement_y2006m02 VALUES (NEW.*);

or

CREATE OR REPLACE FUNCTION measurement_insert_trigger()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO measurement_y2008m01 VALUES (NEW.*);
    RETURN NULL;
END;
$$
LANGUAGE plpgsql;

There's also a discussion of triggers and this sort of wild line:

We must redefine the trigger function each month so that it always points to the current child table. The trigger definition does not need to be updated, however.

In all, this feels like a pretty cumbersome process, and one that's a little outside the realm of how dbt usually operates!

I am acutely interested in figuring out how dbt can better create date-partitioned datasets on data warehouses (like redshift / snowflake / bigquery), so I'm super happy to spend the time to explore some approaches on postgres too. As it stands though, I don't imagine this is something we'd implement in dbt-core. Maybe it's a good use for a Custom Materialization?

Let me know if I'm overlooking or overcomplicating anything, and thanks again for the suggestion!

Thanks for the request @sheyd!

I took a look at the docs you've sent over. Table partitioning on pg looks pretty slick but also very involved.

There are a couple of challenges for us to sort out here: in dbt, users don't specify table DDL. Instead, dbt creates tables with the schema described in create table as queries generated from model select statements. It doesn't appear to me that there's a way to specify table partitioning in a create table as statement - do you know if such a thing is possible?

One approach would be to create an empty table from the specified select statement, then create a table like that other table, copying over the schema and adding a partitioning clause. In practice, this might look like:

create table dbt_dbanin.tbl_partitioning_test__dbt_tmp as (

        -- model SELECT statement (with a limit 0)
	select
		1 as id,
		'drew'::text as name,
		'green'::text as favorite_color
		
	limit 0

);

-- create the partitioned table from the __tmp table definition
create table dbt_dbanin.tbl_partitioning_test
(
	like dbt_dbanin.tbl_partitioning_test__dbt_tmp
)
partition by HASH (favorite_color);

While this might work for creating the partitioned table, it's really only a tiny part of the overall implementation. The docs here make it look to me like dbt would need to create one table per partition, specifying the range of dates in that partition. Further, there's a bunch of examples of custom functions like

CREATE RULE measurement_insert_y2006m02 AS
ON INSERT TO measurement WHERE
    ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' )
DO INSTEAD
    INSERT INTO measurement_y2006m02 VALUES (NEW.*);

or

CREATE OR REPLACE FUNCTION measurement_insert_trigger()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO measurement_y2008m01 VALUES (NEW.*);
    RETURN NULL;
END;
$$
LANGUAGE plpgsql;

There's also a discussion of triggers and this sort of wild line:

We must redefine the trigger function each month so that it always points to the current child table. The trigger definition does not need to be updated, however.

In all, this feels like a pretty cumbersome process, and one that's a little outside the realm of how dbt usually operates!

I am acutely interested in figuring out how dbt can better create date-partitioned datasets on data warehouses (like redshift / snowflake / bigquery), so I'm super happy to spend the time to explore some approaches on postgres too. As it stands though, I don't imagine this is something we'd implement in dbt-core. Maybe it's a good use for a Custom Materialization?

Let me know if I'm overlooking or overcomplicating anything, and thanks again for the suggestion!

The way I was looking at approaching this is to have dbt create the table as it normally would. however, this would just be a temporary table. The operation can be pretty fast as long as you add in a 1=2 kind of filter in the outer select so that the product is an empty table. Once this is created you can query the system catalogue and use the materialisation metadata to create a new table appending the partitioned by clause with the field you want. (assume partition by list)

I was then thinking about hot-swapping partitions, which involved creating a new partition with a dummy partition value (can't create two partitions with the same partition value ) You can then detach the partition and then load the table as normal. once loaded, you would need to detach the previously active partition and attach the new partition.

This is a simplification of the details required to implement this as you would also normally want the materialization to be safe in the event of a failure/disconnect which would involve a decent amount of thought around how to run in a transaction or if that isn't feasible with the mix of operations then ensuring that the job is recoverable. Its been a few months since i really looked a this but i think i was also coming across the issue of having to ensure constraint names/indexes were not duplicated.

@ggam
Copy link

ggam commented Oct 13, 2023

I'm interested in this feature too, but I don't have the knowledge to implement it myself. I think there are two usecases here:
1- Incremental by hot swapping partitions. Example: partition by month. Dbt creates new partitions and swaps the affected ones. This would be the most sophisticated case as described by @IS-Josh.
2- Incremental by delete+write as it's now, just with underlying partitioned tables.

The second case should be way simpler to implement, at the expense of more end-user care. What does the simple case bring to the table? Basically, I could use a different storage (Citus Columnar) on non-changing partitions. Also, partitioning by date should reduce the need for some indexes.

@ggam
Copy link

ggam commented Apr 28, 2024

Just in case it helps anyone, I have a very rough implementation of partitioning by overriding a Postgres macro:

{% macro postgres__create_table_as(temporary, relation, sql) -%}
  {%- set unlogged = config.get('unlogged', default=false) -%}
  {%- set sql_header = config.get('sql_header', none) -%}

  {{ sql_header if sql_header is not none }}

  {% if config.get('partition_by') != None %}
    {# Use partitioning #}

    {%- set partition_by_field = config.get('partition_by')['field'] -%}
    {%- set partition_by_granularity = config.get('partition_by')['granularity'] -%}

    -- We cannot create a partitioned table with "create as select".
    -- Create a dummy temporary table just to be used as a template for the real one
    create temporary table "{{ this.identifier }}__tt" as
    select *
    from ({{ sql }}) model_subq
    where 1 = 2;

    -- Create partitioned table as a copy of the template
    create {% if temporary -%}
        temporary
      {%- elif unlogged -%}
        unlogged
      {%- endif %} table {{ relation }} (like "{{ this.identifier }}__tt")
      partition by range ({{ partition_by_field }});


    {% set queryRequiredPartitions %}
      -- We need to create the partitions before the parent table is able to handle inserts
      with partitions as (
          -- Get the begin and end dates by granularity.
          -- Generate a suffix to append to every partition. This is to avoid collisions as the children tables are not renamed at the end of the process
          -- Old children will just be deleted when dbt drops its parent
          select
              generate_series(
                  date_trunc('{{ partition_by_granularity }}', min({{ partition_by_field }})),
                  date_trunc('{{ partition_by_granularity }}', max({{ partition_by_field }})),
                  '1 {{ partition_by_granularity }}'::interval
              ) as begin_date,
              (select floor(random() * 100 + 1)::int) as rand -- Random number to avoid name colissions
          from ({{ sql }}) model_subq
      )
      select
          -- Generate the final data
          begin_date,
          begin_date + '1 {{ partition_by_granularity }}'::interval as end_date,
          to_char(begin_date, 'yyyymmdd') || '_' || rand as partition_suffix
      from partitions;
    {% endset %}
    {% set requiredPartitions = run_query(queryRequiredPartitions) %}

    -- Create the required partitions
    {% for requiredPartition in requiredPartitions.rows %}
      create
      {% if temporary -%}
        temporary
      {%- elif unlogged -%}
        unlogged
      {%- endif %}
      table
      {{ make_intermediate_relation(relation, requiredPartition['partition_suffix']) }}
      partition of {{ relation }} for values from ('{{ requiredPartition["begin_date"] }}'::timestamp) to ('{{ requiredPartition["end_date"] }}'::timestamp);
    {% endfor %}

    -- Insert into the parent table
    insert into {{ relation }}
    {{ sql }};
  {% else %}
    {# Standard procedure, copied from: https://github.com/dbt-labs/dbt-postgres/blob/cf100560e7712dcc387e286057ffeeccb23a944e/dbt/include/postgres/macros/adapters.sql#L1 #}
    {# TODO: delegate to the standard implementation #}
     create {% if temporary -%}
        temporary
      {%- elif unlogged -%}
        unlogged
      {%- endif %} table {{ relation }}
      {% set contract_config = config.get('contract') %}
      {% if contract_config.enforced %}
        {{ get_assert_columns_equivalent(sql) }}
      {% endif -%}
      {% if contract_config.enforced and (not temporary) -%}
          {{ get_table_columns_and_constraints() }} ;
        insert into {{ relation }} (
          {{ adapter.dispatch('get_column_names', 'dbt')() }}
        )
        {%- set sql = get_select_subquery(sql) %}
      {% else %}
        as
      {% endif %}
      (
        {{ sql }}
      );
  {% endif %}
{%- endmacro %}


{% macro postgres__rename_relation(from_relation, to_relation) %}
  {% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %}
  {% call statement('rename_relation') -%}
    alter table {{ from_relation }} rename to {{ target_name }};

    {# TODO: check if this is a partitioned table and rename its children #}
  {%- endcall %}
{% endmacro %}

To use it:

{{
config(
    materialized = 'table',
    partition_by={
      "field": "created_at",
      "granularity": "month"
    }
) }}

select generate_series(
        current_date - interval '1000 day',
        current_date, '1 day'::interval)::date as created_at, 'hello' as dummy_text

I plan to create a pull request for this to the dbt-postgres repo once I fully test it. Next steps would be to create an incremental materialization that drops and recreates whole partitions instead of merging.

PS: with this approach, the select query needs to be run twice (one for getting the list of partitions and another one for actually inserting the data). Another more lightweight option is to use pg_partman extension for this.

PS2: Postgres 17 plans to add split and merge commands (https://www.dbi-services.com/blog/postgresql-17-split-and-merge-partitions/) which would also add the option to 1- create just one default partition with all values 2- select from the just created table to get min/max dates or values list (which might be faster than repeating the original query, especially if there column is indexed) and 3- split into the required partitions.

@ggam
Copy link

ggam commented Apr 29, 2024

For completeness, a very rough implementation of a custom incremental partitioning strategy:

The strategy macro:

{% macro get_incremental_partition_sql(arg_dict) %}

    {%- set target = arg_dict["target_relation"] -%}
    {%- set source = arg_dict["temp_relation"] -%}
    {%- set dest_columns = arg_dict["dest_columns"] -%} 
    {%- set incremental_predicates = arg_dict["incremental_predicates"] -%}

    {% if incremental_predicates is none %}
      {% do exceptions.raise_compiler_error('Config "incremental_predicates" is required!') %}
    {% endif %}

    {# Get the updated partitions that need to be dropped #}
    {% set queryToDropPartitions %}
        select distinct
          ns.nspname as schema_name,
          c.relname as partition_name
          --pg_get_expr(c.relpartbound, c.oid, true) as partition_expression
        from (
            select distinct tableoid
            from {{ target }} lt
            where 1=1
              {% for predicate in incremental_predicates %}
                  and {{ predicate }}
              {% endfor %}
        ) lt
        left join pg_class c on lt.tableoid = c.oid
        left join pg_namespace ns on c.relnamespace = ns.oid 
    {% endset %}

    -- Drop outdated partitions
    {% set toDropPartitions = run_query(queryToDropPartitions) %}
    {% for toDropPartition in toDropPartitions.rows %}
      drop table "{{ toDropPartition.schema_name }}"."{{ toDropPartition.partition_name }}";
    {% endfor %}

    -- Source relation is already partitioned. We need to:
    -- 1- Get the list of source partitions
    -- 2- Create the same definition on the target, partitioned table
    -- 3- Copy the whole content of the source table into the target on
    {% set queryToCopyPartitions %}
        select distinct
          ns.nspname as schema_name,
          c.relname as partition_name,
          pg_get_expr(c.relpartbound, c.oid, true) as partition_expression,
          c.relname || '_' || (select floor(random() * 100 + 1)::int) as new_partition_name -- Random number to avoid name colissions
        from (
            select distinct tableoid
            from {{ source }} lt
        ) lt
        left join pg_class c on lt.tableoid = c.oid
        left join pg_namespace ns on c.relnamespace = ns.oid 
    {% endset %}

    {% set requiredPartitions = run_query(queryToCopyPartitions) %}
    {% for requiredPartition in requiredPartitions.rows %}
      -- Duplicate table partition name. The original one is temporary
      create {%- if unlogged -%}unlogged {%- endif %} table
      "{{ schema }}"."{{ requiredPartition.new_partition_name }}"
      partition of {{ target }} {{ requiredPartition.partition_expression }};

      {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
      -- Copy the data to the partitioned table
      insert into {{ target }} ({{ dest_cols_csv }})
      (
          select {{ dest_cols_csv }}
          from {{ requiredPartition.partition_name }}
      );

    {% endfor %}
{% endmacro %}

The model:

{{ config(
    materialized = 'incremental',
    partition_by = {
      "field": "created_at",
      "granularity": "month"
    },
    incremental_strategy = 'partition',
    incremental_predicates = ["created_at >= date_trunc('month', current_date - 90)"]
)
}}

with raw_data as (
  select generate_series(
          current_date - interval '2000 day',
          current_date, '1 day'::interval)::date as created_at, 'hello' as dummy_text
)
select *
from raw_data
{% if is_incremental() %}
    where created_at >= date_trunc('month', current_date - 90)
{% endif %}

@ggam ggam linked a pull request May 1, 2024 that will close this issue
10 tasks
@ggam
Copy link

ggam commented May 1, 2024

Opened a PR on #78. If you are interested in this feature, please show your support and feedback!

@amychen1776 amychen1776 reopened this Oct 22, 2024
@amychen1776 amychen1776 transferred this issue from dbt-labs/dbt-core Oct 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants