Skip to content

Commit

Permalink
upgrades (#171)
Browse files Browse the repository at this point in the history
* upgrades

* quotes

* config
  • Loading branch information
austinFlipside authored Nov 19, 2024
1 parent 21c6997 commit 9aa3368
Show file tree
Hide file tree
Showing 139 changed files with 269 additions and 1,613 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dbt_run_dev_refresh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ run-name: dbt_run_dev_refresh
on:
workflow_dispatch:
schedule:
# Runs "at 9:00 UTC" (see https://crontab.guru)
- cron: '0 9 * * *'
# Runs "at 6:30 UTC" (see https://crontab.guru)
- cron: '30 6 * * 1'

env:
DBT_PROFILES_DIR: ./
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
name: dbt_run_streamline_decoder_history
run-name: dbt_run_streamline_decoder_history
name: dbt_run_scheduled_decoded_logs_history_user_abis
run-name: dbt_run_scheduled_decoded_logs_history_user_abis

on:
workflow_dispatch:
schedule:
# Runs "at 10:10 UTC AM" (see https://crontab.guru)
- cron: '10 10 * * *'

branches:
- "main"

env:
DBT_PROFILES_DIR: ./

Expand All @@ -22,12 +21,10 @@ env:
concurrency:
group: ${{ github.workflow }}



jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod

steps:
Expand All @@ -42,6 +39,7 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Kick off decoded logs history, if there are new ABIs from users
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120}' -m "gnosis_models,tag:streamline_decoded_logs_complete" "gnosis_models,tag:streamline_decoded_logs_history"
dbt run-operation run_decoded_logs_history
49 changes: 49 additions & 0 deletions .github/workflows/dbt_run_streamline_decoded_logs_history.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: dbt_run_streamline_decoded_logs_history
run-name: dbt_run_streamline_decoded_logs_history

on:
workflow_dispatch:
branches:
- "main"

env:
DBT_PROFILES_DIR: ./

ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"

concurrency:
group: ${{ github.workflow }}

jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"

- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Update complete table
run: |
dbt run -m "gnosis_models,tag:streamline_decoded_logs_complete"
- name: Decode historical logs
run: |
dbt run-operation decoded_logs_history --vars '{"STREAMLINE_INVOKE_STREAMS":True}'
4 changes: 3 additions & 1 deletion data/github_actions__workflows.csv
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ dbt_run_scheduled_non_realtime,"10,40 * * * *"
dbt_run_streamline_chainhead,"0,30 * * * *"
dbt_run_streamline_decoder,"20,50 * * * *"
dbt_run_scheduled_curated,"30 * * * *"
dbt_test_tasks,"25 * * * *"
dbt_test_tasks,"25 * * * *"
dbt_run_streamline_decoded_logs_history,"25 17 * * 6"
dbt_run_scheduled_decoded_logs_history_user_abis,"49 23 * * *"
126 changes: 126 additions & 0 deletions macros/decoder/decoded_logs_history.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{% macro decoded_logs_history(backfill_mode=false) %}

{%- set params = {
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000),
"producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000),
"worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000),
"producer_limit_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000)
} -%}

{% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %}

{% set find_months_query %}
SELECT
DISTINCT date_trunc('month', block_timestamp)::date as month
FROM {{ ref('core__fact_blocks') }}
ORDER BY month ASC
{% endset %}

{% set results = run_query(find_months_query) %}

{% if execute %}
{% set months = results.columns[0].values() %}

{% for month in months %}
{% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %}

{% set create_view_query %}
create or replace view streamline.{{view_name}} as (
WITH target_blocks AS (
SELECT
block_number
FROM {{ ref('core__fact_blocks') }}
WHERE date_trunc('month', block_timestamp) = '{{month}}'::timestamp
),
new_abis AS (
SELECT
abi,
parent_contract_address,
event_signature,
start_block,
end_block
FROM {{ ref('silver__complete_event_abis') }}
{% if not backfill_mode %}
WHERE inserted_timestamp > dateadd('day', -30, sysdate())
{% endif %}
),
existing_logs_to_exclude AS (
SELECT _log_id
FROM {{ ref('streamline__complete_decode_logs') }} l
INNER JOIN target_blocks b using (block_number)
),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
concat(l.tx_hash::string, '-', l.event_index::string) as _log_id
FROM target_blocks b
INNER JOIN {{ ref('core__fact_event_logs') }} l using (block_number)
WHERE l.tx_status = 'SUCCESS' and date_trunc('month', l.block_timestamp) = '{{month}}'::timestamp
)
SELECT
l.block_number,
l._log_id,
A.abi,
OBJECT_CONSTRUCT(
'topics', l.topics,
'data', l.data,
'address', l.contract_address
) AS data
FROM candidate_logs l
INNER JOIN new_abis A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics[0]::STRING
AND l.block_number BETWEEN A.start_block AND A.end_block
WHERE NOT EXISTS (
SELECT 1
FROM existing_logs_to_exclude e
WHERE e._log_id = l._log_id
)
LIMIT {{ params.sql_limit }}
)
{% endset %}

{# Create the view #}
{% do run_query(create_view_query) %}
{{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }}

{% if var("STREAMLINE_INVOKE_STREAMS", false) %}
{# Check if rows exist first #}
{% set check_rows_query %}
SELECT EXISTS(SELECT 1 FROM streamline.{{view_name}} LIMIT 1)
{% endset %}

{% set results = run_query(check_rows_query) %}
{% set has_rows = results.columns[0].values()[0] %}

{% if has_rows %}
{# Invoke streamline since rows exist to decode #}
{% set decode_query %}
SELECT streamline.udf_bulk_decode_logs(
object_construct(
'sql_source', '{{view_name}}',
'producer_batch_size', {{ params.producer_batch_size }},
'producer_limit_size', {{ params.producer_limit_size }})
);
{% endset %}

{% do run_query(decode_query) %}
{{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }}

{# Call wait since we actually did some decoding #}
{% do run_query("call system$wait(" ~ wait_time ~ ")") %}
{{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
{% else %}
{{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }}
{% endif %}
{% endif %}

{% endfor %}
{% endif %}

{% endmacro %}
29 changes: 29 additions & 0 deletions macros/decoder/run_decoded_logs_history.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{% macro run_decoded_logs_history() %}

{% set check_for_new_user_abis_query %}
select 1
from {{ ref('silver__user_verified_abis') }}
where _inserted_timestamp::date = sysdate()::date
and dayname(sysdate()) <> 'Sat'
{% endset %}

{% set results = run_query(check_for_new_user_abis_query) %}

{% if execute %}
{% set new_user_abis = results.columns[0].values()[0] %}

{% if new_user_abis %}
{% set invoke_workflow_query %}
SELECT
github_actions.workflow_dispatches(
'FlipsideCrypto',
'gnosis-models',
'dbt_run_streamline_decoded_logs_history.yml',
NULL
)
{% endset %}

{% do run_query(invoke_workflow_query) %}
{% endif %}
{% endif %}
{% endmacro %}
2 changes: 2 additions & 0 deletions models/github_actions/github_actions__current_task_status.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ models:
- dbt_expectations.expect_column_values_to_be_in_set:
value_set:
- TRUE
config:
severity: warn
- name: SUCCESSES
tests:
- dbt_expectations.expect_column_values_to_be_in_set:
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 9aa3368

Please sign in to comment.