Skip to content

Commit

Permalink
batch insert for wormhole
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Jan 20, 2025
1 parent d0bd836 commit 78e3edf
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 73 deletions.
121 changes: 48 additions & 73 deletions src/handlers/runWormhole.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { wrapScheduledLambda } from "../utils/wrap";
import adapter, { fetchWormholeEvents, normalizeChainName } from "../adapters/wormhole";
import { sql } from "../utils/db";
import { insertTransactionRow } from "../utils/wrappa/postgres/write";
import { insertTransactionRows } from "../utils/wrappa/postgres/write";
import { getBridgeID } from "../utils/wrappa/postgres/query";
import dayjs from "dayjs";
import { insertConfigEntriesForAdapter } from "../utils/adapter";
export const handler = async () => {
try {
await insertConfigEntriesForAdapter(adapter, "wormhole");
const startTs = dayjs().subtract(12, "hour").unix();
const startTs = dayjs().subtract(1, "days").unix();
const endTs = dayjs().unix();
const bridgeIds = Object.fromEntries(
await Promise.all(
Expand All @@ -25,12 +25,14 @@ export const handler = async () => {
.format("YYYY-MM-DD HH:mm:ss")}) to ${endTs} (${dayjs.unix(endTs).format("YYYY-MM-DD HH:mm:ss")})`
);
const events = await fetchWormholeEvents(startTs, endTs);
const BATCH_SIZE = 500;
const usdVolume = events.reduce((acc, event) => acc + (Number(event.token_usd_amount) || 0), 0);
console.log(`Total USD volume: ${usdVolume * 2}`);

const processBatch = async (sql: any, batch: any[]) => {
const insertPromises: Promise<void>[] = [];
// Prepare all rows before starting transactions
const prepareRows = (events: any[]) => {
const rows: any[] = [];

for (const event of batch) {
for (const event of events) {
const {
block_timestamp,
transaction_hash,
Expand All @@ -46,82 +48,55 @@ export const handler = async () => {
const destinationChain = normalizeChainName(destination_chain);

if (bridgeIds[sourceChain]) {
try {
insertPromises.push(
insertTransactionRow(
sql,
true,
{
bridge_id: bridgeIds[sourceChain],
chain: sourceChain,
tx_hash: transaction_hash,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_from_address ?? "0x",
tx_to: token_transfer_to_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: true,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
},
"upsert"
)
);
} catch (error) {
console.error(`Error inserting Wormhole event: ${error}`, event);
}
rows.push({
bridge_id: bridgeIds[sourceChain],
chain: sourceChain,
tx_hash: transaction_hash,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_from_address ?? "0x",
tx_to: token_transfer_to_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: true,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
});
}

if (bridgeIds[destinationChain]) {
try {
insertPromises.push(
insertTransactionRow(
sql,
true,
{
bridge_id: bridgeIds[destinationChain],
chain: destinationChain,
tx_hash: `${transaction_hash}_destination`,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_to_address ?? "0x",
tx_to: token_transfer_from_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: false,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
},
"upsert"
)
);
} catch (error) {
console.error(`Error inserting Wormhole event: ${error}`, event);
}
rows.push({
bridge_id: bridgeIds[destinationChain],
chain: destinationChain,
tx_hash: `${transaction_hash}_destination`,
ts: parseInt(block_timestamp) * 1000,
tx_block: null,
tx_from: token_transfer_to_address ?? "0x",
tx_to: token_transfer_from_address ?? "0x",
token: token_address ?? "0x0000000000000000000000000000000000000000",
amount: token_usd_amount || "0",
is_deposit: false,
is_usd_volume: true,
txs_counted_as: 1,
origin_chain: null,
});
}
}

await Promise.all(insertPromises);
console.log(`Inserted ${insertPromises.length} of ${events.length} Wormhole events`);
return rows;
};

let start = 0;
let end = events.length;

while (start < end) {
await sql.begin(async (sql) => {
await processBatch(sql, events.slice(start, start + BATCH_SIZE));
});
const allRows = prepareRows(events);
console.log(`Prepared ${allRows.length} rows for insertion`);

await sql.begin(async (sql) => {
await processBatch(sql, events.slice(end - BATCH_SIZE, end));
try {
await sql.begin(async (trx) => {
await insertTransactionRows(trx, true, allRows);
});

start += BATCH_SIZE;
end -= BATCH_SIZE;
console.log(`Successfully inserted ${allRows.length} Wormhole events`);
} catch (error) {
console.error("Failed to insert Wormhole events:", error);
throw error;
}
} catch (error) {
console.error("Error processing Wormhole events:", error);
Expand Down
75 changes: 75 additions & 0 deletions src/utils/wrappa/postgres/write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,78 @@ export const insertOrUpdateTokenWithoutPrice = async (token: string, symbol: str
console.error(`Could not insert or update token without price: ${token}`, e);
}
};

export const insertTransactionRows = async (
sql: postgres.TransactionSql<{}>,
allowNullTxValues: boolean,
rows: {
bridge_id: string;
chain: string;
tx_hash: string | null;
ts: number;
tx_block: number | null;
tx_from: string | null;
tx_to: string | null;
token: string;
amount: string;
is_deposit: boolean;
is_usd_volume: boolean;
txs_counted_as: number | null;
origin_chain: string | null;
}[]
) => {
if (rows.length === 0) return;

rows.forEach((params) => {
Object.entries(params).forEach(([key, val]) => {
if (val == null) {
if (!allowNullTxValues) {
throw new Error(`Transaction for bridgeID ${params.bridge_id} has a null value for ${key}.`);
}
} else if (typeof val !== txTypes[key]) {
throw new Error(
`Transaction for bridgeID ${params.bridge_id} has ${typeof val} for ${key} when it must be ${txTypes[key]}.`
);
}
});
});

const chunkSize = 500;
for (let i = 0; i < rows.length; i += chunkSize) {
const chunk = rows.slice(i, i + chunkSize);

for (let retryCount = 0; retryCount < 5; retryCount++) {
try {
await sql`
INSERT INTO bridges.transactions ${sql(chunk)}
ON CONFLICT (bridge_id, chain, tx_hash, token, tx_from, tx_to)
DO UPDATE SET
ts = EXCLUDED.ts,
tx_block = COALESCE(EXCLUDED.tx_block, bridges.transactions.tx_block),
amount = CASE
WHEN EXCLUDED.amount::numeric > bridges.transactions.amount::numeric
THEN EXCLUDED.amount
ELSE bridges.transactions.amount
END,
is_deposit = EXCLUDED.is_deposit,
is_usd_volume = EXCLUDED.is_usd_volume,
txs_counted_as = COALESCE(EXCLUDED.txs_counted_as, bridges.transactions.txs_counted_as),
origin_chain = COALESCE(EXCLUDED.origin_chain, bridges.transactions.origin_chain)
WHERE
EXCLUDED.ts >= bridges.transactions.ts OR
EXCLUDED.amount::numeric > bridges.transactions.amount::numeric
`;
break;
} catch (e) {
if (retryCount >= 4) {
console.error("Failed chunk:", chunk[0], "Error:", e);
throw new Error(`Could not insert transaction rows in bulk after 5 retries.`);
} else {
console.error("Bulk insert error, retrying:", e);
await new Promise((resolve) => setTimeout(resolve, 1000 * (retryCount + 1))); // Exponential backoff
continue;
}
}
}
}
};

0 comments on commit 78e3edf

Please sign in to comment.