diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index daf80ebc..4ef7d149 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -2,34 +2,34 @@ name: Deploy on: push: - branches: [ master ] - + branches: [master] + jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Get Node.js - uses: actions/setup-node@v1 - with: - node-version: '16' - - run: npm ci - - name: Type Check - run: npm run ts - - name: Deploy infrastructure stack - run: npm run deploy:prod - env: - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - PSQL_URL: ${{ secrets.PSQL_URL }} - PSQL_USERNAME: ${{ secrets.PSQL_USERNAME }} - PSQL_PW: ${{ secrets.PSQL_PW }} - BSC_RPC: ${{ secrets.BSC_RPC }} - CELO_RPC: ${{ secrets.CELO_RPC }} - ETHEREUM_RPC: ${{ secrets.ETHEREUM_RPC }} - OPTIMISM_RPC: ${{ secrets.OPTIMISM_RPC }} - AURORA_RPC: ${{ secrets.AURORA_RPC }} - ARBITRUM_RPC: ${{ secrets.ARBITRUM_RPC }} - SOLANA_RPC: ${{ secrets.SOLANA_RPC }} - RSK_ARCHIVAL_RPC: ${{ secrets.RSK_ARCHIVAL_RPC }} - DISCORD_WEBHOOK: ${{ secrets.DISCORD_WEBHOOK }} + - uses: actions/checkout@v2 + - name: Get Node.js + uses: actions/setup-node@v1 + with: + node-version: "16" + - run: npm ci + - name: Type Check + run: npm run ts + - name: Deploy infrastructure stack + run: npm run deploy:prod + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + PSQL_URL: ${{ secrets.PSQL_URL }} + PSQL_USERNAME: ${{ secrets.PSQL_USERNAME }} + PSQL_PW: ${{ secrets.PSQL_PW }} + BSC_RPC: ${{ secrets.BSC_RPC }} + CELO_RPC: ${{ secrets.CELO_RPC }} + ETHEREUM_RPC: ${{ secrets.ETHEREUM_RPC }} + OPTIMISM_RPC: ${{ secrets.OPTIMISM_RPC }} + AURORA_RPC: ${{ secrets.AURORA_RPC }} + ARBITRUM_RPC: ${{ secrets.ARBITRUM_RPC }} + RSK_ARCHIVAL_RPC: ${{ secrets.RSK_ARCHIVAL_RPC }} + DISCORD_WEBHOOK: ${{ secrets.DISCORD_WEBHOOK }} + ALLIUM_API_KEY: ${{ secrets.ALLIUM_API_KEY }} diff --git a/package-lock.json b/package-lock.json index 3ac0886c..9f6dd16f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "axios": "^0.21.0", "axios-rate-limit": "^1.3.0", "bignumber.js": "^9.0.1", + "dayjs": "^1.11.13", "dotenv": "^8.2.0", "ethers": "^5", "graphql": "^16.0.0", @@ -4897,10 +4898,9 @@ } }, "node_modules/dayjs": { - "version": "1.11.4", - "resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.11.4.tgz", - "integrity": "sha512-Zj/lPM5hOvQ1Bf7uAvewDaUcsJoI6JmNqmHhHl3nyumwe0XHwt8sWdOVAPACJzCebL8gQCi+K49w7iKWnGwX9g==", - "dev": true + "version": "1.11.13", + "resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.11.13.tgz", + "integrity": "sha512-oaMBel6gjolK862uaPQOVTA7q3TZhuSvuMQAAglQDOWYO9A91IrAOUJEyKVlqJlHE0vq5p5UXxzdPfMH/x6xNg==" }, "node_modules/debug": { "version": "4.3.4", diff --git a/package.json b/package.json index 466362c5..85600c16 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "axios": "^0.21.0", "axios-rate-limit": "^1.3.0", "bignumber.js": "^9.0.1", + "dayjs": "^1.11.13", "dotenv": "^8.2.0", "ethers": "^5", "graphql": "^16.0.0", diff --git a/serverless.yml b/serverless.yml index 9ef86dbb..8ce27ae4 100644 --- a/serverless.yml +++ b/serverless.yml @@ -66,9 +66,9 @@ provider: OPTIMISM_RPC: ${env:OPTIMISM_RPC} AURORA_RPC: ${env:AURORA_RPC} ARBITRUM_RPC: ${env:ARBITRUM_RPC} - SOLANA_RPC: ${env:SOLANA_RPC} RSK_ARCHIVAL_RPC: ${env:RSK_ARCHIVAL_RPC} DISCORD_WEBHOOK: ${env:DISCORD_WEBHOOK} + ALLIUM_API_KEY: ${env:ALLIUM_API_KEY} functions: bridgeDayStats: @@ -177,6 +177,12 @@ functions: - http: path: netflows/{period} method: get + runWormhole: + handler: src/handlers/runWormhole.default + timeout: 900 + events: + - schedule: cron(0 * * * ? *) + resources: # CORS for api gateway errors - ${file(resources/api-gateway-errors.yml)} diff --git a/src/adapters/index.ts b/src/adapters/index.ts index 53bc0f83..6df1efe4 100644 --- a/src/adapters/index.ts +++ b/src/adapters/index.ts @@ -69,9 +69,10 @@ import fastbtc from "./rootstock-fastbtc-bridge"; import crowdswap from "./crowdswap"; import mint from "./mint"; import suibridge from "./suibridge"; -import retrobridge from "./retrobridge" -import layerswap from "./layerswap" +import retrobridge from "./retrobridge"; +import layerswap from "./layerswap"; import hyperlane from "./hyperlane"; +import wormhole from "./wormhole"; export default { polygon, @@ -147,6 +148,7 @@ export default { retrobridge, layerswap, hyperlane, + wormhole, } as { [bridge: string]: BridgeAdapter | AsyncBridgeAdapter; }; diff --git a/src/adapters/wormhole/index.ts b/src/adapters/wormhole/index.ts new file mode 100644 index 00000000..98f797c1 --- /dev/null +++ b/src/adapters/wormhole/index.ts @@ -0,0 +1,138 @@ +import dayjs from "dayjs"; +import { queryAllium } from "../../helpers/allium"; + +type WormholeBridgeEvent = { + block_timestamp: string; + transaction_hash: string; + token_transfer_from_address: string; + token_transfer_to_address: string; + token_address: string; + token_usd_amount: string; + token_amount: string; + source_chain: string; + destination_chain: string; +}; + +const chains = [ + "ethereum", + "avalanche", + "avax", + "bsc", + "polygon", + "arbitrum", + "optimism", + "base", + "zksync", + "scroll", + "aptos", + "sui", + "solana", + "sei", + "mantle", + "fantom", + "injective", + "moonbeam", + "oasis", + "celo", + "kaia", + "near", + "algorand", + "terra", + "terra classic", + "karura", + "acala", +]; + +export const chainNameMapping: { [key: string]: string } = { + ethereum: "Ethereum", + avalanche: "Avalanche", + avax: "Avalanche", + polygon: "Polygon", + arbitrum: "Arbitrum", + optimism: "Optimism", + fantom: "Fantom", + base: "Base", + solana: "Solana", + sui: "Sui", + aptos: "Aptos", + celo: "Celo", + mantle: "Mantle", + scroll: "Scroll", + algorand: "Algorand", + sei: "Sei", + moonbeam: "Moonbeam", + injective: "Injective", + kaia: "Kaia", + oasis: "Oasis", + BNB_Smart_Chain: "BSC", + bnb_smart_chain: "BSC", + terra: "Terra Classic", + terra2: "Terra", + "terra classic": "Terra Classic", + near: "Near", +}; + +export function normalizeChainName(chainName: string): string { + const lowercaseChain = chainName.toLowerCase(); + + const mapping = chainNameMapping[lowercaseChain] || chainNameMapping[chainName]; + + if (mapping) { + return mapping?.toLowerCase(); + } + + return chainName?.toLowerCase(); +} + +export const fetchWormholeEvents = async ( + fromTimestamp: number, + toTimestamp: number +): Promise => { + let allResults: WormholeBridgeEvent[] = []; + let currentTimestamp = fromTimestamp; + const BATCH_SIZE = 10000; + + while (currentTimestamp < toTimestamp) { + const result = await queryAllium(` + select + BLOCK_TIMESTAMP, + TOKEN_TRANSFER_FROM_ADDRESS, + TOKEN_TRANSFER_TO_ADDRESS, + TOKEN_ADDRESS, + TOKEN_USD_AMOUNT, + TOKEN_AMOUNT, + SOURCE_CHAIN, + DESTINATION_CHAIN, + UNIQUE_ID AS transaction_hash + from org_db__defillama.default.wormhole_token_transfers + where + block_timestamp BETWEEN TO_TIMESTAMP_NTZ(${currentTimestamp}) AND TO_TIMESTAMP_NTZ(${toTimestamp}) + and status != 'REFUNDED' + order by block_timestamp + limit ${BATCH_SIZE}; + `); + + if (result.length === 0) break; + + const normalizedBatch = result.map((row: any) => ({ + ...row, + block_timestamp: dayjs(row.block_timestamp).unix(), + })); + + allResults = [...allResults, ...normalizedBatch]; + console.log(`Fetched ${allResults.length} Wormhole events.`); + + currentTimestamp = normalizedBatch[normalizedBatch.length - 1].block_timestamp + 1; + + if (result.length < BATCH_SIZE) break; + } + + return allResults; +}; + +const adapter = chains.reduce((acc: any, chain: string) => { + acc[chain] = true; + return acc; +}, {}); + +export default adapter; diff --git a/src/data/bridgeNetworkData.ts b/src/data/bridgeNetworkData.ts index dc2e147e..825f5225 100644 --- a/src/data/bridgeNetworkData.ts +++ b/src/data/bridgeNetworkData.ts @@ -122,7 +122,6 @@ export default [ // avalanche: "avax", // }, // }, - { id: 10, @@ -1514,7 +1513,7 @@ export default [ bridgeDbName: "retrobridge", iconLink: "icons:retrobridge", largeTxThreshold: 10000, - url: 'https://retrobridge.io/', + url: "https://retrobridge.io/", chains: [ "Ethereum", "Arbitrum", @@ -1573,7 +1572,7 @@ export default [ "Linea", "Blast", "Scroll", - "BSC", + "BSC", "X Layer", "Taiko", "ZKsync Era", @@ -1689,4 +1688,42 @@ export default [ avalanche: "avax", }, }, + { + id: 77, + displayName: "Wormhole", + bridgeDbName: "wormhole", + iconLink: "icons:portal", + largeTxThreshold: 10000, + url: "https://portalbridge.com/", + chains: [ + "Ethereum", + "Polygon", + "Fantom", + "Avalanche", + "Aurora", + "Celo", + "Klaytn", + "BSC", + "Moonbeam", + "Optimism", + "Arbitrum", + "Base", + "Solana", + "Near", + "Aptos", + "Sui", + "Sei", + "Karura", + "Acala", + "Algorand", + "Terra", + "Terra Classic", + "Oasis", + "Celo", + "Kaia", + ], + chainMapping: { + avalanche: "avax", + }, + }, ] as BridgeNetwork[]; diff --git a/src/handlers/runWormhole.ts b/src/handlers/runWormhole.ts new file mode 100644 index 00000000..7de40b09 --- /dev/null +++ b/src/handlers/runWormhole.ts @@ -0,0 +1,120 @@ +import { wrapScheduledLambda } from "../utils/wrap"; +import adapter, { fetchWormholeEvents, normalizeChainName } from "../adapters/wormhole"; +import { sql } from "../utils/db"; +import { closeIdleConnections, insertTransactionRow } from "../utils/wrappa/postgres/write"; +import { getBridgeID } from "../utils/wrappa/postgres/query"; +import dayjs from "dayjs"; +import { insertConfigEntriesForAdapter } from "../utils/adapter"; + +const handler = async () => { + try { + await closeIdleConnections(); + await insertConfigEntriesForAdapter(adapter, "wormhole"); + const startTs = dayjs().subtract(1, "day").unix(); + const endTs = dayjs().unix(); + const bridgeIds = Object.fromEntries( + await Promise.all( + Object.keys(adapter).map(async (chain) => { + chain = chain.toLowerCase(); + const bridgeId = await getBridgeID("wormhole", chain); + return [chain, bridgeId?.id]; + }) + ) + ); + console.log( + `Running Wormhole adapter for ${startTs} (${dayjs + .unix(startTs) + .format("YYYY-MM-DD HH:mm:ss")}) to ${endTs} (${dayjs.unix(endTs).format("YYYY-MM-DD HH:mm:ss")})` + ); + const events = await fetchWormholeEvents(startTs, endTs); + const BATCH_SIZE = 500; + + await sql.begin(async (sql) => { + for (let i = 0; i < events.length; i += BATCH_SIZE) { + const batch = events.slice(i, i + BATCH_SIZE); + const insertPromises: Promise[] = []; + + for (const event of batch) { + const { + block_timestamp, + transaction_hash, + token_transfer_from_address, + token_transfer_to_address, + token_address, + token_usd_amount, + source_chain, + destination_chain, + } = event; + + const sourceChain = normalizeChainName(source_chain); + const destinationChain = normalizeChainName(destination_chain); + + if (bridgeIds[sourceChain]) { + try { + insertPromises.push( + insertTransactionRow( + sql, + true, + { + bridge_id: bridgeIds[sourceChain], + chain: sourceChain, + tx_hash: transaction_hash, + ts: parseInt(block_timestamp) * 1000, + tx_block: null, + tx_from: token_transfer_from_address ?? "0x", + tx_to: token_transfer_to_address ?? "0x", + token: token_address ?? "0x0000000000000000000000000000000000000000", + amount: token_usd_amount || "0", + is_deposit: true, + is_usd_volume: true, + txs_counted_as: 1, + origin_chain: null, + }, + "upsert" + ) + ); + } catch (error) { + console.error(`Error inserting Wormhole event: ${error}`, event); + } + } + + if (bridgeIds[destinationChain]) { + try { + insertPromises.push( + insertTransactionRow( + sql, + true, + { + bridge_id: bridgeIds[destinationChain], + chain: destinationChain, + tx_hash: `${transaction_hash}_destination`, + ts: parseInt(block_timestamp) * 1000, + tx_block: null, + tx_from: token_transfer_to_address ?? "0x", + tx_to: token_transfer_from_address ?? "0x", + token: token_address ?? "0x0000000000000000000000000000000000000000", + amount: token_usd_amount || "0", + is_deposit: false, + is_usd_volume: true, + txs_counted_as: 1, + origin_chain: null, + }, + "upsert" + ) + ); + } catch (error) { + console.error(`Error inserting Wormhole event: ${error}`, event); + } + } + } + + await Promise.all(insertPromises); + } + }); + } catch (error) { + console.error("Error processing Wormhole events:", error); + throw error; + } +}; + +export default wrapScheduledLambda(handler); diff --git a/src/helpers/allium.ts b/src/helpers/allium.ts new file mode 100644 index 00000000..e1a30345 --- /dev/null +++ b/src/helpers/allium.ts @@ -0,0 +1,70 @@ +import axios from "axios"; + +const token = {} as Record; + +const HEADERS = { + "Content-Type": "application/json", + "X-API-KEY": process.env.ALLIUM_API_KEY, +} as Record; + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +async function startAlliumQuery(sqlQuery: string) { + const query = await axios.post( + `https://api.allium.so/api/v1/explorer/queries/phBjLzIZ8uUIDlp0dD3N/run-async`, + { + parameters: { + fullQuery: sqlQuery, + }, + }, + { + headers: HEADERS, + } + ); + + return query.data["run_id"]; +} + +async function retrieveAlliumResults(queryId: string) { + const results = await axios.get(`https://api.allium.so/api/v1/explorer/query-runs/${queryId}/results?f=json`, { + headers: HEADERS, + }); + return results.data.data; +} + +async function queryAllium(sqlQuery: string) { + const startTime = Date.now(); + for (let i = 0; i < 20; i++) { + console.log(`Querying Allium. Attempt ${i}`); + if (!token[sqlQuery]) { + token[sqlQuery] = await startAlliumQuery(sqlQuery); + } + + if (!token[sqlQuery]) { + throw new Error("Couldn't get a token from allium"); + } + + const statusReq = await axios.get(`https://api.allium.so/api/v1/explorer/query-runs/${token[sqlQuery]}/status`, { + headers: HEADERS, + }); + const status = statusReq.data; + if (status === "success") { + try { + const results = await retrieveAlliumResults(token[sqlQuery]); + delete token[sqlQuery]; + return results; + } catch (e) { + console.log("query result", e); + throw e; + } + } else if (status === "failed") { + delete token[sqlQuery]; + continue; + } + await sleep(20e3); + } + console.log(`Query ${sqlQuery} took ${(Date.now() - startTime) / 1000}s`); + throw new Error("Not processed in time"); +} + +export { queryAllium, startAlliumQuery, retrieveAlliumResults }; diff --git a/src/helpers/cache.ts b/src/helpers/cache.ts new file mode 100644 index 00000000..b254b4fa --- /dev/null +++ b/src/helpers/cache.ts @@ -0,0 +1,125 @@ +import * as sdk from "@defillama/sdk"; +import axios from "axios"; + +const Bucket = "dimensions-adapter-cache"; + +function getKey(project: string, chain: string) { + return `cache/${project}/${chain}.json`; +} + +function getFileKey(project: string, chain: string) { + return `${Bucket}/${getKey(project, chain)}`; +} + +export async function getCache(project: string, chain: string, {} = {}) { + const fileKey = getFileKey(project, chain); + + try { + const json = await sdk.cache.readCache(fileKey); + if (!json || Object.keys(json).length === 0) throw new Error("Invalid data"); + return json; + } catch (e) { + sdk.log("failed to fetch data from s3 bucket:", fileKey); + // sdk.log(e) + return {}; + } +} + +export async function setCache(project: string, chain: string, cache: any) { + const Key = getFileKey(project, chain); + + try { + await sdk.cache.writeCache(Key, cache); + } catch (e) { + sdk.log("failed to write data to s3 bucket: ", Key); + sdk.log(e); + } +} + +const configCache: any = {}; + +async function _setCache(project: string, chain: string, json: any) { + if (!json || json?.error?.message) return; + const strData = typeof json === "string" ? json : JSON.stringify(json); + let isValidData = strData.length > 42; + if (isValidData) + // sometimes we get bad data/empty object, we dont overwrite cache with it + await setCache(project, chain, json); +} + +export async function getConfig( + project: string, + endpoint: string, + { + fetcher, + }: { + fetcher?: () => Promise; + } = {} +) { + if (!project || (!endpoint && !fetcher)) throw new Error("Missing parameters"); + const key = "config-cache"; + const cacheKey = getKey(key, project); + if (!configCache[cacheKey]) configCache[cacheKey] = _getConfig(); + return configCache[cacheKey]; + + async function _getConfig() { + try { + let json; + if (endpoint) { + json = (await axios.get(endpoint)).data; + } else { + json = await fetcher!(); + } + if (!json) throw new Error("Invalid data"); + await _setCache(key, project, json); + return json; + } catch (e) { + // sdk.log(e) + sdk.log(project, "tryng to fetch from cache, failed to fetch data from endpoint:", endpoint); + return getCache(key, project); + } + } +} + +export async function configPost(project: string, endpoint: string, data: any) { + if (!project || !endpoint) throw new Error("Missing parameters"); + const key = "config-cache"; + const cacheKey = getKey(key, project); + if (!configCache[cacheKey]) configCache[cacheKey] = _configPost(); + return configCache[cacheKey]; + + async function _configPost() { + try { + const { data: json } = await axios.post(endpoint, data); + await _setCache(key, project, json); + return json; + } catch (e) { + // sdk.log(e) + sdk.log(project, "tryng to fetch from cache, failed to fetch data from endpoint:", endpoint); + return getCache(key, project); + } + } +} + +export async function cacheTransactions(cacheKey: string, cache: any) { + const Key = getFileKey("transactions", cacheKey); + + try { + await sdk.cache.writeCache(Key, cache, { skipR2CacheWrite: true }); + } catch (e) { + sdk.log("failed to write data to s3 bucket: ", Key); + sdk.log(e); + } +} + +export async function readCachedTransactions(cacheKey: string) { + const Key = getFileKey("transactions", cacheKey); + + try { + const data = await sdk.cache.readCache(Key, { skipR2Cache: true }); + return data; + } catch (e) { + sdk.log("failed to read data: ", Key); + return {}; + } +} diff --git a/src/utils/adapter.ts b/src/utils/adapter.ts index 6fb2caa5..a52f2aba 100644 --- a/src/utils/adapter.ts +++ b/src/utils/adapter.ts @@ -376,6 +376,12 @@ export const runAdapterHistorical = async ( const currentTimestamp = await getCurrentUnixTimestamp(); const bridgeNetwork = bridgeNetworks.filter((bridgeNetwork) => bridgeNetwork.id === bridgeNetworkId)[0]; const { bridgeDbName } = bridgeNetwork; + + if (bridgeDbName === "wormhole") { + console.log("Skipping Wormhole adapter, handled separately"); + return; + } + let adapter = adapters[bridgeDbName]; adapter = isAsyncAdapter(adapter) ? await adapter.build() : adapter; diff --git a/src/utils/wrappa/postgres/query.ts b/src/utils/wrappa/postgres/query.ts index 5d39438d..9da6043f 100644 --- a/src/utils/wrappa/postgres/query.ts +++ b/src/utils/wrappa/postgres/query.ts @@ -377,20 +377,25 @@ const getNetflows = async (period: TimePeriod) => { WITH period_flows AS ( SELECT c.chain, - SUM(ha.total_deposited_usd - ha.total_withdrawn_usd) as net_flow + SUM(CASE + -- Deposits are outflows (-), withdrawals are inflows (+) + WHEN c.destination_chain IS NULL THEN (ha.total_withdrawn_usd - ha.total_deposited_usd) + -- For bridges with fixed destination chains, count flows on destination chain (inverted) + ELSE (ha.total_deposited_usd - ha.total_withdrawn_usd) + END) as net_flow FROM bridges.hourly_aggregated ha JOIN bridges.config c ON ha.bridge_id = c.id - WHERE ha.ts >= date_trunc(${period}, NOW()) - ${intervalPeriod} - AND ha.ts < date_trunc(${period}, NOW()) + WHERE ha.ts >= date_trunc(${period}, NOW() AT TIME ZONE 'UTC') - ${intervalPeriod} + AND ha.ts < date_trunc(${period}, NOW() AT TIME ZONE 'UTC') AND LOWER(c.chain) NOT LIKE '%dydx%' GROUP BY c.chain ) SELECT chain, - net_flow + net_flow::text FROM period_flows WHERE net_flow IS NOT NULL - ORDER BY ABS(net_flow) DESC; + ORDER BY ABS(net_flow::numeric) DESC; `; };