Skip to content

Commit

Permalink
Fix incremental run time issues (#1193)
Browse files Browse the repository at this point in the history
In a previous PR, the aim was to standardize the incremental strategy across spells: https://github.com/duneanalytics/abstractions/commit/9ae14c5289a85605314bcd6fd49efcb05567eecc



We were using the « merge » strategy, which allows to update fields if the underlying data changed for the whole table, and append new data. While I remain convinced the merge strategy is the best one to ensure data quality, this unfortunately came at a cost: Quarter hourly jobs could not run under 15 minutes, which caused jobs to queue up. 

To mitigate this, I propose to use the « default » incremental strategy, that just appends new data incrementally (without updating the whole dataset if needed). The big advantage is that tables are updated around twice as fast as the merge strategy. To mitigate the fact that existing data can’t be updated, I added uniqueness tests to all incremental tables so we can regularly make sure there are no duplicates/data quality is good.


I've checked that:

* [x] I tested the query on dune.com after compiling the model with dbt compile (compiled queries are written to the target directory)
* [x] I used "refs" to reference other models in this repo and "sources" to reference raw or decoded tables 
* [x] the directory tree matches the pattern /sector/blockchain/ e.g. /tokens/ethereum
* [x] if adding a new model, I added a test
* [x] the filename is unique and ends with .sql
* [x] each file has only one view, table or function defined  
* [x] column names are `lowercase_snake_cased`
  • Loading branch information
soispoke authored Jun 20, 2022
1 parent 4e55585 commit 19e8270
Show file tree
Hide file tree
Showing 20 changed files with 163 additions and 85 deletions.
36 changes: 33 additions & 3 deletions spellbook/macros/alter_table_properties.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,33 @@ ALTER VIEW balances_ethereum.erc721_latest SET TBLPROPERTIES('dune.public'='true
'dune.data_explorer.contributors'='["hildobby","soispoke","dot2dotseurat"]');
{% endset %}

{% set balances_ethereum_erc1155_day %}
ALTER VIEW balances_ethereum.erc1155_day SET TBLPROPERTIES('dune.public'='true',
'dune.data_explorer.blockchains'='["ethereum"]',
'dune.data_explorer.category'='abstraction',
'dune.data_explorer.abstraction.type'='sector',
'dune.data_explorer.abstraction.name'='balances',
'dune.data_explorer.contributors'='["soispoke"]');
{% endset %}

{% set balances_ethereum_erc1155_hour %}
ALTER VIEW balances_ethereum.erc1155_hour SET TBLPROPERTIES('dune.public'='true',
'dune.data_explorer.blockchains'='["ethereum"]',
'dune.data_explorer.category'='abstraction',
'dune.data_explorer.abstraction.type'='sector',
'dune.data_explorer.abstraction.name'='balances',
'dune.data_explorer.contributors'='["soispoke"]');
{% endset %}

{% set balances_ethereum_erc1155_latest %}
ALTER VIEW balances_ethereum.erc1155_latest SET TBLPROPERTIES('dune.public'='true',
'dune.data_explorer.blockchains'='["ethereum"]',
'dune.data_explorer.category'='abstraction',
'dune.data_explorer.abstraction.type'='sector',
'dune.data_explorer.abstraction.name'='balances',
'dune.data_explorer.contributors'='["soispoke"]');
{% endset %}

{% set magiceden_trades %}
ALTER TABLE magiceden.trades SET TBLPROPERTIES('dune.public'='true',
'dune.data_explorer.blockchains'='["solana"]',
Expand Down Expand Up @@ -136,8 +163,8 @@ ALTER VIEW tokens_ethereum.nft SET TBLPROPERTIES('dune.public'='true',
'dune.data_explorer.contributors'='["dot2dotseurat","soispoke"]');
{% endset %}

{% set uniswap_ethereum_trades %}
ALTER TABLE uniswap_ethereum.trades SET TBLPROPERTIES('dune.public'='true',
{% set uniswap_trades %}
ALTER TABLE uniswap.trades SET TBLPROPERTIES('dune.public'='true',
'dune.data_explorer.blockchains'='["ethereum"]',
'dune.data_explorer.category'='abstraction',
'dune.data_explorer.abstraction.type'='project',
Expand All @@ -151,6 +178,9 @@ ALTER TABLE uniswap_ethereum.trades SET TBLPROPERTIES('dune.public'='true',
{% do run_query(balances_ethereum_erc721_day) %}
{% do run_query(balances_ethereum_erc721_hour) %}
{% do run_query(balances_ethereum_erc721_latest) %}
{% do run_query(balances_ethereum_erc1155_day) %}
{% do run_query(balances_ethereum_erc1155_hour) %}
{% do run_query(balances_ethereum_erc1155_latest) %}
{% do run_query(magiceden_trades) %}
{% do run_query(nft_trades) %}
{% do run_query(opensea_active_traders_day) %}
Expand All @@ -160,7 +190,7 @@ ALTER TABLE uniswap_ethereum.trades SET TBLPROPERTIES('dune.public'='true',
{% do run_query(tokens_ethereum_erc20) %}
{% do run_query(transfers_ethereum_erc20) %}
{% do run_query(tokens_ethereum_nft) %}
{% do run_query(uniswap_ethereum_trades) %}
{% do run_query(uniswap_trades) %}

{% do log("Tables generated", info=True) %}
{%- else -%}
Expand Down
16 changes: 13 additions & 3 deletions spellbook/macros/optimize_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ OPTIMIZE transfers_ethereum.erc721_agg_hour;
OPTIMIZE transfers_ethereum.erc721_agg_day;
{% endset %}

{% set transfers_ethereum_erc1155_agg_hour %}
OPTIMIZE transfers_ethereum.erc1155_agg_hour;
{% endset %}

{% set transfers_ethereum_erc1155_agg_day %}
OPTIMIZE transfers_ethereum.erc1155_agg_day;
{% endset %}

{% set opensea_trades %}
OPTIMIZE opensea.trades;
{% endset %}
Expand All @@ -24,8 +32,8 @@ OPTIMIZE opensea.trades;
OPTIMIZE magiceden.trades;
{% endset %}

{% set uniswap_ethereum_trades %}
OPTIMIZE uniswap_ethereum.trades;
{% set uniswap_trades %}
OPTIMIZE uniswap.trades;
{% endset %}

{% set nft_trades %}
Expand All @@ -37,9 +45,11 @@ OPTIMIZE nft.trades;
{% do run_query(transfers_ethereum_erc20_agg_day) %}
{% do run_query(transfers_ethereum_erc721_agg_hour) %}
{% do run_query(transfers_ethereum_erc721_agg_day) %}
{% do run_query(transfers_ethereum_erc1155_agg_hour) %}
{% do run_query(transfers_ethereum_erc1155_agg_day) %}
{% do run_query(opensea_trades) %}
{% do run_query(magiceden_trades) %}
{% do run_query(uniswap_ethereum_trades) %}
{% do run_query(uniswap_trades) %}
{% do run_query(nft_trades) %}

{% do log("Tables Optimized", info=True) %}
Expand Down
4 changes: 3 additions & 1 deletion spellbook/models/magiceden/magiceden_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ models:
- name: amount_usd
description: "USD value of the trade at time of execution"
- name: unique_trade_id
description: "Unique trade ID (derived from signatures[0] and id in solana transactions)"
description: "Unique trade ID (derived from signatures[0] and id in solana transactions)"
tests:
- unique
17 changes: 7 additions & 10 deletions spellbook/models/magiceden/magiceden_trades.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
{{ config(
alias='trades',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_trade_id'
)
{{
config(
alias='trades', materialize = 'incremental')
}}

SELECT blockchain, 'magiceden' as project, '' as version, tx_hash, block_time, amount_usd, amount, token_symbol, token_address, unique_trade_id FROM
SELECT blockchain, 'magiceden' as project, '' as version, tx_hash, block_time, amount_usd, amount, token_symbol, token_address, unique_trade_id
FROM
(
SELECT blockchain, tx_hash, block_time, amount_usd, amount, token_symbol, token_address, unique_trade_id
FROM {{ ref('magiceden_solana_trades') }}
SELECT blockchain, tx_hash, block_time, amount_usd, amount, token_symbol, token_address, unique_trade_id
FROM {{ ref('magiceden_solana_trades') }}
)
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
Expand Down
2 changes: 0 additions & 2 deletions spellbook/models/magiceden/solana/magiceden_solana_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ models:
columns:
- name: unique_trade_id
description: "Unique trade id, combination of signatures[0] and id"
tests:
- unique
- &blockchain
name: blockchain
description: "Blockchain"
Expand Down
10 changes: 3 additions & 7 deletions spellbook/models/nft/nft_trades.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
{{ config(
alias='trades',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_trade_id'
)
{{
config(
alias='trades', materialize = 'incremental')
}}

SELECT blockchain, project, version, tx_hash, block_time, amount_usd, amount, token_symbol, token_address, unique_trade_id FROM
Expand Down
2 changes: 0 additions & 2 deletions spellbook/models/opensea/ethereum/opensea_ethereum_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ models:
columns:
- name: unique_trade_id
description: "Unique trade id, combination of tx_hash and trade_id"
tests:
- unique
- &blockchain
name: blockchain
description: "Blockchain"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
{{
config(
alias='trades'
)
}}
{{ config(alias='trades') }}

SELECT
evt_tx_hash || '-' || evt_index::string as unique_trade_id,
Expand Down
2 changes: 2 additions & 0 deletions spellbook/models/opensea/opensea_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ models:
description: "USD value of the trade at time of execution"
- name: unique_trade_id
description: "Unique trade ID"
tests:
- unique

- name: opensea_volume_day
meta:
Expand Down
10 changes: 3 additions & 7 deletions spellbook/models/opensea/opensea_trades.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
{{ config(
alias='trades',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_trade_id'
)
{{
config(
alias='trades', materialize = 'incremental')
}}

SELECT blockchain, 'opensea' as project, 'v1' as version, tx_hash, block_time, amount_usd, amount, token_symbol, token_address, unique_trade_id FROM
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{{ config(
alias ='erc1155_agg_day',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_transfer_id'
materialized ='incremental'
)
}}

Expand All @@ -18,7 +15,7 @@ select
FROM {{ ref('transfers_ethereum_erc1155') }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where evt_block_time > now() - interval 2 days
where date_trunc('day', evt_block_time) > now() - interval 2 days
{% endif %}
group by
date_trunc('day', evt_block_time), wallet_address, token_address, tokenId, unique_tx_id
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{{ config(
alias ='erc1155_agg_hour',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_transfer_id'
materialized ='incremental'
)
}}

Expand All @@ -18,7 +15,7 @@ select
FROM {{ ref('transfers_ethereum_erc1155') }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where evt_block_time > now() - interval 2 days
where date_trunc('hour', evt_block_time) > now() - interval 2 days
{% endif %}
group by
date_trunc('hour', evt_block_time), wallet_address, token_address, tokenId, unique_tx_id
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{{ config(
alias ='erc20_agg_day',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_transfer_id'
materialized ='incremental'
)
}}

Expand All @@ -20,7 +17,7 @@ from {{ ref('transfers_ethereum_erc20') }} tr
left join {{ ref('tokens_ethereum_erc20') }} t on t.contract_address = tr.token_address
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where tr.evt_block_time > now() - interval 2 days
where date_trunc('day', tr.evt_block_time) > now() - interval 2 days
{% endif %}
group by
date_trunc('day', tr.evt_block_time), tr.wallet_address, tr.token_address, t.symbol,unique_tx_id
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{{ config(
alias ='erc20_agg_hour',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_transfer_id'
materialized ='incremental'
)
}}

Expand All @@ -20,7 +17,7 @@ from {{ ref('transfers_ethereum_erc20') }} tr
left join {{ ref('tokens_ethereum_erc20') }} t on t.contract_address = tr.token_address
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where tr.evt_block_time > now() - interval 2 days
where date_trunc('hour', tr.evt_block_time) > now() - interval 2 days
{% endif %}
group by
date_trunc('hour', tr.evt_block_time), tr.wallet_address, tr.token_address, t.symbol,unique_tx_id
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{{ config(
alias ='erc721_agg_day',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_transfer_id'
alias ='erc721_agg_day',
materialized ='incremental'
)
}}

Expand All @@ -17,7 +14,7 @@ select
from {{ ref('transfers_ethereum_erc721') }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where evt_block_time > now() - interval 2 days
where date_trunc('day', evt_block_time) > now() - interval 2 days
{% endif %}
group by
date_trunc('day', evt_block_time), wallet_address, token_address, tokenId,unique_tx_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{{ config(
alias ='erc721_agg_hour',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_transfer_id'
alias ='erc721_agg_hour',
materialized ='incremental'
)
}}

Expand All @@ -17,7 +14,7 @@ select
from {{ ref('transfers_ethereum_erc721') }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where evt_block_time > now() - interval 2 days
where date_trunc('hour', evt_block_time) > now() - interval 2 days
{% endif %}
group by
date_trunc('hour', evt_block_time), wallet_address, token_address, tokenId,unique_tx_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ models:
description: "Receive this transaction"
- &unique_trade_id
name: unique_trade_id
description: "Unique trade ID (derived from tx_hash and evt_index for ethereum dex trades)"
tests:
- unique
description: "Unique trade ID (derived from tx_hash and evt_index for ethereum dex trades)"

- name: uniswap_v2_ethereum_trades
meta:
Expand Down
11 changes: 1 addition & 10 deletions spellbook/models/uniswap/ethereum/uniswap_ethereum_trades.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
{{
config(
alias='trades',
materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
unique_key='unique_trade_id'
)
alias='trades')
}}

SELECT blockchain, project, version, block_time, token_a_symbol, token_b_symbol,
Expand All @@ -14,7 +9,3 @@ SELECT blockchain, project, version, block_time, token_a_symbol, token_b_symbol,
FROM (SELECT * FROM {{ ref('uniswap_v2_ethereum_trades') }}
UNION ALL
SELECT * FROM {{ ref('uniswap_v3_ethereum_trades') }})
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE block_time > now() - interval 2 days
{% endif %}
13 changes: 13 additions & 0 deletions spellbook/models/uniswap/uniswap_trades.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{
config(
alias='trades', materialize = 'incremental')
}}

SELECT blockchain, project, version, block_time, token_a_symbol, token_b_symbol,
token_a_amount, token_b_amount, trader_a, trader_b, usd_amount, token_a_address,
token_b_address, exchange_contract_address, tx_hash, tx_from, tx_to, unique_trade_id
FROM (SELECT * FROM {{ ref('uniswap_ethereum_trades') }})
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE block_time > now() - interval 2 days
{% endif %}
Loading

0 comments on commit 19e8270

Please sign in to comment.