From bdbe4f6e674ab87de6de83c1a3cb92163ac41361 Mon Sep 17 00:00:00 2001 From: Austin <93135983+austinFlipside@users.noreply.github.com> Date: Wed, 26 Jun 2024 12:38:41 -0400 Subject: [PATCH] traces2 (#154) * traces2 * not null --- .../workflows/dbt_run_temp_reload_traces.yml | 45 ++++++ models/silver/core/silver__traces2.sql | 141 ++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 .github/workflows/dbt_run_temp_reload_traces.yml create mode 100644 models/silver/core/silver__traces2.sql diff --git a/.github/workflows/dbt_run_temp_reload_traces.yml b/.github/workflows/dbt_run_temp_reload_traces.yml new file mode 100644 index 00000000..1214f0b7 --- /dev/null +++ b/.github/workflows/dbt_run_temp_reload_traces.yml @@ -0,0 +1,45 @@ +name: dbt_run_temp_reload_traces +run-name: dbt_run_temp_reload_traces + +on: + workflow_dispatch: + schedule: + # Runs “At minute 9,” (see https://crontab.guru) + - cron: '9 * * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod_2xl + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m "gnosis_models,tag:traces2" \ No newline at end of file diff --git a/models/silver/core/silver__traces2.sql b/models/silver/core/silver__traces2.sql new file mode 100644 index 00000000..6ff17498 --- /dev/null +++ b/models/silver/core/silver__traces2.sql @@ -0,0 +1,141 @@ +-- depends_on: {{ ref('bronze__streamline_traces') }} +{{ config ( + materialized = "incremental", + incremental_strategy = 'delete+insert', + unique_key = "block_number", + cluster_by = ['modified_timestamp::DATE','partition_key'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", + tags = ['traces2'] +) }} + +WITH bronze_traces AS ( + + SELECT + block_number, + _partition_by_block_id AS partition_key, + VALUE :array_index :: INT AS tx_position, + DATA :result AS full_traces, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_FR_traces') }} +WHERE + _partition_by_block_id BETWEEN ( + SELECT + MAX(partition_key) + FROM + {{ this }} + ) + AND ( + SELECT + MAX(partition_key) + 1000000 + FROM + {{ this }} + ) + AND DATA :result IS NOT NULL +{% else %} + {{ ref('bronze__streamline_FR_traces') }} +WHERE + _partition_by_block_id <= 5000000 + AND DATA :result IS NOT NULL +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position +ORDER BY + _inserted_timestamp DESC)) = 1 +), +flatten_traces AS ( + SELECT + block_number, + tx_position, + partition_key, + IFF( + path IN ( + 'result', + 'result.value', + 'result.type', + 'result.to', + 'result.input', + 'result.gasUsed', + 'result.gas', + 'result.from', + 'result.output', + 'result.error', + 'result.revertReason', + 'gasUsed', + 'gas', + 'type', + 'to', + 'from', + 'value', + 'input', + 'error', + 'output', + 'revertReason' + ), + 'ORIGIN', + REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '') + ) AS trace_address, + _inserted_timestamp, + OBJECT_AGG( + key, + VALUE + ) AS trace_json, + CASE + WHEN trace_address = 'ORIGIN' THEN NULL + WHEN POSITION( + '_' IN trace_address + ) = 0 THEN 'ORIGIN' + ELSE REGEXP_REPLACE( + trace_address, + '_[0-9]+$', + '', + 1, + 1 + ) + END AS parent_trace_address, + SPLIT( + trace_address, + '_' + ) AS trace_address_array + FROM + bronze_traces txs, + TABLE( + FLATTEN( + input => PARSE_JSON( + txs.full_traces + ), + recursive => TRUE + ) + ) f + WHERE + f.index IS NULL + AND f.key != 'calls' + AND f.path != 'result' + GROUP BY + block_number, + tx_position, + partition_key, + trace_address, + _inserted_timestamp +) +SELECT + block_number, + tx_position, + trace_address, + parent_trace_address, + trace_address_array, + trace_json, + partition_key, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['block_number', 'tx_position', 'trace_address'] + ) }} AS traces_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id +ORDER BY + _inserted_timestamp DESC)) = 1