-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
2 changed files
with
186 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
name: dbt_run_temp_reload_traces | ||
run-name: dbt_run_temp_reload_traces | ||
|
||
on: | ||
workflow_dispatch: | ||
schedule: | ||
# Runs “At minute 9,” (see https://crontab.guru) | ||
- cron: '9 * * * *' | ||
|
||
env: | ||
DBT_PROFILES_DIR: ./ | ||
|
||
ACCOUNT: "${{ vars.ACCOUNT }}" | ||
ROLE: "${{ vars.ROLE }}" | ||
USER: "${{ vars.USER }}" | ||
PASSWORD: "${{ secrets.PASSWORD }}" | ||
REGION: "${{ vars.REGION }}" | ||
DATABASE: "${{ vars.DATABASE }}" | ||
WAREHOUSE: "${{ vars.WAREHOUSE }}" | ||
SCHEMA: "${{ vars.SCHEMA }}" | ||
|
||
concurrency: | ||
group: ${{ github.workflow }} | ||
|
||
jobs: | ||
run_dbt_jobs: | ||
runs-on: ubuntu-latest | ||
environment: | ||
name: workflow_prod_2xl | ||
|
||
steps: | ||
- uses: actions/checkout@v3 | ||
|
||
- uses: actions/setup-python@v4 | ||
with: | ||
python-version: "3.10" | ||
cache: "pip" | ||
|
||
- name: install dependencies | ||
run: | | ||
pip install -r requirements.txt | ||
dbt deps | ||
- name: Run DBT Jobs | ||
run: | | ||
dbt run -m "gnosis_models,tag:traces2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
-- depends_on: {{ ref('bronze__streamline_traces') }} | ||
{{ config ( | ||
materialized = "incremental", | ||
incremental_strategy = 'delete+insert', | ||
unique_key = "block_number", | ||
cluster_by = ['modified_timestamp::DATE','partition_key'], | ||
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", | ||
tags = ['traces2'] | ||
) }} | ||
|
||
WITH bronze_traces AS ( | ||
|
||
SELECT | ||
block_number, | ||
_partition_by_block_id AS partition_key, | ||
VALUE :array_index :: INT AS tx_position, | ||
DATA :result AS full_traces, | ||
_inserted_timestamp | ||
FROM | ||
|
||
{% if is_incremental() %} | ||
{{ ref('bronze__streamline_FR_traces') }} | ||
WHERE | ||
_partition_by_block_id BETWEEN ( | ||
SELECT | ||
MAX(partition_key) | ||
FROM | ||
{{ this }} | ||
) | ||
AND ( | ||
SELECT | ||
MAX(partition_key) + 1000000 | ||
FROM | ||
{{ this }} | ||
) | ||
AND DATA :result IS NOT NULL | ||
{% else %} | ||
{{ ref('bronze__streamline_FR_traces') }} | ||
WHERE | ||
_partition_by_block_id <= 5000000 | ||
AND DATA :result IS NOT NULL | ||
{% endif %} | ||
|
||
qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position | ||
ORDER BY | ||
_inserted_timestamp DESC)) = 1 | ||
), | ||
flatten_traces AS ( | ||
SELECT | ||
block_number, | ||
tx_position, | ||
partition_key, | ||
IFF( | ||
path IN ( | ||
'result', | ||
'result.value', | ||
'result.type', | ||
'result.to', | ||
'result.input', | ||
'result.gasUsed', | ||
'result.gas', | ||
'result.from', | ||
'result.output', | ||
'result.error', | ||
'result.revertReason', | ||
'gasUsed', | ||
'gas', | ||
'type', | ||
'to', | ||
'from', | ||
'value', | ||
'input', | ||
'error', | ||
'output', | ||
'revertReason' | ||
), | ||
'ORIGIN', | ||
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '') | ||
) AS trace_address, | ||
_inserted_timestamp, | ||
OBJECT_AGG( | ||
key, | ||
VALUE | ||
) AS trace_json, | ||
CASE | ||
WHEN trace_address = 'ORIGIN' THEN NULL | ||
WHEN POSITION( | ||
'_' IN trace_address | ||
) = 0 THEN 'ORIGIN' | ||
ELSE REGEXP_REPLACE( | ||
trace_address, | ||
'_[0-9]+$', | ||
'', | ||
1, | ||
1 | ||
) | ||
END AS parent_trace_address, | ||
SPLIT( | ||
trace_address, | ||
'_' | ||
) AS trace_address_array | ||
FROM | ||
bronze_traces txs, | ||
TABLE( | ||
FLATTEN( | ||
input => PARSE_JSON( | ||
txs.full_traces | ||
), | ||
recursive => TRUE | ||
) | ||
) f | ||
WHERE | ||
f.index IS NULL | ||
AND f.key != 'calls' | ||
AND f.path != 'result' | ||
GROUP BY | ||
block_number, | ||
tx_position, | ||
partition_key, | ||
trace_address, | ||
_inserted_timestamp | ||
) | ||
SELECT | ||
block_number, | ||
tx_position, | ||
trace_address, | ||
parent_trace_address, | ||
trace_address_array, | ||
trace_json, | ||
partition_key, | ||
_inserted_timestamp, | ||
{{ dbt_utils.generate_surrogate_key( | ||
['block_number', 'tx_position', 'trace_address'] | ||
) }} AS traces_id, | ||
SYSDATE() AS inserted_timestamp, | ||
SYSDATE() AS modified_timestamp, | ||
'{{ invocation_id }}' AS _invocation_id | ||
FROM | ||
flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id | ||
ORDER BY | ||
_inserted_timestamp DESC)) = 1 |