Skip to content

Commit

Permalink
add tokens query, change wormhole job
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Dec 3, 2024
1 parent 1ee60cf commit 319932b
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 75 deletions.
3 changes: 3 additions & 0 deletions sql/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,6 @@ CREATE TABLE IF NOT EXISTS bridges.hourly_volume (

CREATE INDEX IF NOT EXISTS hourly_volume_ts ON bridges.hourly_volume (ts);
CREATE INDEX IF NOT EXISTS hourly_volume_chain ON bridges.hourly_volume (chain);

CREATE INDEX IF NOT EXISTS hourly_volume_bridge_ts_chain ON bridges.hourly_volume (bridge_id, ts, chain);
CREATE INDEX IF NOT EXISTS config_bridge_name_id ON bridges.config (bridge_name, id);
8 changes: 6 additions & 2 deletions src/handlers/getBridgeStatsOnDay.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { IResponse, successResponse, errorResponse } from "../utils/lambda-response";
import wrap from "../utils/wrap";
import { getCurrentUnixTimestamp, getTimestampAtStartOfDay } from "../utils/date";
import { queryAggregatedDailyTimestampRange, queryConfig } from "../utils/wrappa/postgres/query";
import {
queryAggregatedDailyTimestampRange,
queryAggregatedTokensInRange,
queryConfig,
} from "../utils/wrappa/postgres/query";
import { getLlamaPrices } from "../utils/prices";
import { importBridgeNetwork } from "../data/importBridgeNetwork";
import BigNumber from "bignumber.js";
Expand Down Expand Up @@ -128,7 +132,7 @@ const getBridgeStatsOnDay = async (timestamp: string = "0", chain: string, bridg
sourceChainsDailyData = [...sourceChainData, ...sourceChainsDailyData];
})
);
const dailyData = await queryAggregatedDailyTimestampRange(queryTimestamp, endTimestamp, queryChain, bridgeDbName);
const dailyData = await queryAggregatedTokensInRange(queryTimestamp, endTimestamp, queryChain, bridgeDbName);
let dailyTokensDeposited = {} as TokenRecord;
let dailyTokensWithdrawn = {} as TokenRecord;
let dailyAddressesDeposited = {} as AddressRecord;
Expand Down
159 changes: 86 additions & 73 deletions src/handlers/runWormhole.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,88 +27,101 @@ export const handler = async () => {
const events = await fetchWormholeEvents(startTs, endTs);
const BATCH_SIZE = 500;

for (let i = 0; i < events.length; i += BATCH_SIZE) {
await sql.begin(async (sql) => {
const batch = events.slice(i, i + BATCH_SIZE);
const insertPromises: Promise<void>[] = [];
const processBatch = async (sql: any, batch: any[]) => {
const insertPromises: Promise<void>[] = [];

for (const event of batch) {
const {
block_timestamp,
transaction_hash,
token_transfer_from_address,
token_transfer_to_address,
token_address,
token_usd_amount,
source_chain,
destination_chain,
} = event;
for (const event of batch) {
const {
block_timestamp,
transaction_hash,
token_transfer_from_address,
token_transfer_to_address,
token_address,
token_usd_amount,
source_chain,
destination_chain,
} = event;

const sourceChain = normalizeChainName(source_chain);
const destinationChain = normalizeChainName(destination_chain);
const sourceChain = normalizeChainName(source_chain);
const destinationChain = normalizeChainName(destination_chain);

if (bridgeIds[sourceChain]) {
try {
insertPromises.push(
insertTransactionRow(
sql,
true,
{
bridge_id: bridgeIds[sourceChain],
chain: sourceChain,
tx_hash: transaction_hash,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_from_address ?? "0x",
tx_to: token_transfer_to_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: true,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
},
"upsert"
)
);
} catch (error) {
console.error(`Error inserting Wormhole event: ${error}`, event);
}
if (bridgeIds[sourceChain]) {
try {
insertPromises.push(
insertTransactionRow(
sql,
true,
{
bridge_id: bridgeIds[sourceChain],
chain: sourceChain,
tx_hash: transaction_hash,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_from_address ?? "0x",
tx_to: token_transfer_to_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: true,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
},
"upsert"
)
);
} catch (error) {
console.error(`Error inserting Wormhole event: ${error}`, event);
}
}

if (bridgeIds[destinationChain]) {
try {
insertPromises.push(
insertTransactionRow(
sql,
true,
{
bridge_id: bridgeIds[destinationChain],
chain: destinationChain,
tx_hash: `${transaction_hash}_destination`,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_to_address ?? "0x",
tx_to: token_transfer_from_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: false,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
},
"upsert"
)
);
} catch (error) {
console.error(`Error inserting Wormhole event: ${error}`, event);
}
if (bridgeIds[destinationChain]) {
try {
insertPromises.push(
insertTransactionRow(
sql,
true,
{
bridge_id: bridgeIds[destinationChain],
chain: destinationChain,
tx_hash: `${transaction_hash}_destination`,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_to_address ?? "0x",
tx_to: token_transfer_from_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: false,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
},
"upsert"
)
);
} catch (error) {
console.error(`Error inserting Wormhole event: ${error}`, event);
}
}
}

await Promise.all(insertPromises);
console.log(`Inserted ${insertPromises.length} of ${events.length} Wormhole events`);
};

await Promise.all(insertPromises);
console.log(`Inserted ${insertPromises.length} of ${events.length} Wormhole events`);
let start = 0;
let end = events.length;

while (start < end) {
await sql.begin(async (sql) => {
await processBatch(sql, events.slice(start, start + BATCH_SIZE));
});

await sql.begin(async (sql) => {
await processBatch(sql, events.slice(end - BATCH_SIZE, end));
});

start += BATCH_SIZE;
end -= BATCH_SIZE;
}
} catch (error) {
console.error("Error processing Wormhole events:", error);
Expand Down
44 changes: 44 additions & 0 deletions src/utils/wrappa/postgres/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,49 @@ const getNetflows = async (period: TimePeriod) => {
`;
};

const queryAggregatedTokensInRange = async (
startTimestamp: number,
endTimestamp: number,
chain?: string,
bridgeNetworkName?: string
) => {
let conditions = sql`WHERE ha.ts >= to_timestamp(${startTimestamp})::date
AND ha.ts <= to_timestamp(${endTimestamp})::date`;

if (chain) {
conditions = sql`${conditions} AND c.chain = ${chain}`;
}

if (bridgeNetworkName) {
conditions = sql`${conditions} AND c.bridge_name = ${bridgeNetworkName}`;
}

return await sql<
{
bridge_id: string;
ts: Date;
total_tokens_deposited: string[];
total_tokens_withdrawn: string[];
total_address_deposited: string[];
total_address_withdrawn: string[];
}[]
>`
SELECT
ha.bridge_id,
ha.ts,
ha.total_tokens_deposited,
ha.total_tokens_withdrawn,
ha.total_address_deposited,
ha.total_address_withdrawn
FROM
bridges.hourly_aggregated ha
JOIN
bridges.config c ON ha.bridge_id = c.id
${conditions}
ORDER BY ts;
`;
};

export {
getBridgeID,
getConfigsWithDestChain,
Expand All @@ -401,4 +444,5 @@ export {
queryAggregatedHourlyTimestampRange,
getLast24HVolume,
getNetflows,
queryAggregatedTokensInRange,
};

0 comments on commit 319932b

Please sign in to comment.