Skip to content

Commit

Permalink
revert/beacon-validator-sl2 (#951)
Browse files Browse the repository at this point in the history
revert validators
  • Loading branch information
drethereum authored Sep 18, 2024
1 parent b900b34 commit fb7c984
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 88 deletions.
17 changes: 5 additions & 12 deletions models/beacon_chain/silver/silver__beacon_validators.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,9 @@
) }}

SELECT
COALESCE(
VALUE :"SLOT_NUMBER" :: INT,
VALUE :"block_number" :: INT
) AS block_number,
COALESCE(
VALUE :"STATE_ID" :: STRING,
metadata :request :"state_id" :: STRING
) AS state_id,
array_index AS INDEX,
block_number,
state_id,
INDEX,
array_index,
DATA :balance :: INTEGER / pow(
10,
Expand All @@ -38,7 +32,7 @@ SELECT
DATA :validator: withdrawal_credentials :: STRING AS withdrawal_credentials,
DATA :validator AS validator_details,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_number', 'index']) }} AS id,
id,
id AS beacon_validators_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
Expand All @@ -62,7 +56,6 @@ WHERE
DATA NOT ILIKE '%not found%'
AND DATA NOT ILIKE '%internal server error%'
{% endif %}

qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1
_inserted_timestamp DESC)) = 1
Original file line number Diff line number Diff line change
@@ -1,8 +1,55 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "beacon_validators_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
block_number = false
) }}

WITH meta AS (

SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(
SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER
) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "beacon_validators") }}')
) A
)
SELECT
block_number,
state_id,
INDEX,
array_index,
b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_number', 'index', 'array_index']) }} AS id,
s._partition_by_block_id AS _partition_by_block_id,
DATA
FROM
{{ source(
'bronze_streamline',
'beacon_validators'
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"beacon_validators_v2",
"sql_limit" :"10",
"producer_batch_size" :"1",
"worker_batch_size" :"1",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('node_name','quicknode', 'sql_source','{{this.identifier}}', 'external_table','beacon_validators', 'route','validators', 'producer_batch_size', 10,'producer_limit_size', 100, 'worker_batch_size', 1, 'producer_batch_chunks_size', 1))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_beacon_history']
) }}

WITH to_do AS (

SELECT
slot_number,
state_id
Expand All @@ -29,22 +21,6 @@ WITH to_do AS (
)
SELECT
slot_number,
state_id,
ROUND(
slot_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || state_id || '/validators',
OBJECT_CONSTRUCT(
'accept',
'application/json'
),
NULL,
'vault/prod/ethereum/quicknode/mainnet'
) AS request
state_id
FROM
to_do
ORDER BY
slot_number DESC
to_do
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"beacon_validators_v2",
"sql_limit" :"10",
"producer_batch_size" :"1",
"worker_batch_size" :"1",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('node_name','quicknode', 'sql_source','{{this.identifier}}', 'external_table','beacon_validators', 'route','validators', 'producer_batch_size', 1,'producer_limit_size', 10, 'worker_batch_size', 1, 'producer_batch_chunks_size', 1))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_beacon_realtime']
) }}

WITH to_do AS (

SELECT
block_number AS slot_number,
state_id
Expand All @@ -26,38 +18,15 @@ WITH to_do AS (
state_id
FROM
{{ ref("streamline__complete_beacon_validators") }}
),
ready_slots AS (
SELECT
slot_number,
state_id
FROM
to_do
UNION
SELECT
slot_number,
state_id
FROM
{{ ref("_missing_validators") }}
)
SELECT
slot_number,
state_id,
ROUND(
slot_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || state_id || '/validators',
OBJECT_CONSTRUCT(
'accept',
'application/json'
),
NULL,
'vault/prod/ethereum/quicknode/mainnet'
) AS request
state_id
FROM
to_do
UNION
SELECT
slot_number,
state_id
FROM
ready_slots
ORDER BY
slot_number DESC
{{ ref("_missing_validators") }}

0 comments on commit fb7c984

Please sign in to comment.