diff --git a/adapters/xfai/hourly_blocks.csv b/adapters/xfai/hourly_blocks.csv index d862ab16..f25b33ee 100644 --- a/adapters/xfai/hourly_blocks.csv +++ b/adapters/xfai/hourly_blocks.csv @@ -1,2 +1,2 @@ number,timestamp -3976979,1713974398 +4828868,1713974398 diff --git a/adapters/xfai/src/config.ts b/adapters/xfai/src/config.ts index e0d6a1b3..519216d1 100644 --- a/adapters/xfai/src/config.ts +++ b/adapters/xfai/src/config.ts @@ -1,9 +1,10 @@ export const CHAIN_ID = 59144; export const RPC_URL = "https://rpc.linea.build"; -export const LIQUIDITY_EVENTS_DB = - "postgres://mobula_readonly:password@analytics.cb5emp3h2kdo.eu-central-1.rds.amazonaws.com:5432/xfai-logs?sslmode=no-verify"; export const XFAI_FACTORY = "0xa5136eAd459F0E61C99Cec70fe8F5C24cF3ecA26"; export const XFAI_POOL_INIT = "0xd29425d309539268aa2f934062f86ea332822e787dafc6baba7cfda029630330"; export const MULTICALL = "0xca11bde05977b3631167028862be2a173976ca11"; export const WETH = "0xe5D7C2a44FfDDf6b295A15c148167daaAf5Cf34f"; + +export const SUBGRAPH_URL = + "https://api.studio.thegraph.com/query/68034/xfai-dex/v1.0.0"; diff --git a/adapters/xfai/src/index.ts b/adapters/xfai/src/index.ts index de3bc072..5e8ae3fb 100644 --- a/adapters/xfai/src/index.ts +++ b/adapters/xfai/src/index.ts @@ -1,8 +1,7 @@ -import { Client } from "pg"; import { CHAIN_ID, - LIQUIDITY_EVENTS_DB, RPC_URL, + SUBGRAPH_URL, WETH, XFAI_FACTORY, XFAI_POOL_INIT, @@ -40,12 +39,6 @@ type OutputDataSchemaRow = { usd_price: number; //assign 0 if not available }; -async function getDBConnection() { - const client = new Client(LIQUIDITY_EVENTS_DB); - await client.connect(); - return client; -} - async function getProvider() { const provider = new StaticJsonRpcProvider(RPC_URL, CHAIN_ID); await provider.ready; @@ -55,30 +48,114 @@ async function getProvider() { type ChangedLiquidity = { owner: string; token: string; - liquidity: number; + liquidity: bigint; }; +type ChangedLiquidityWithBlock = ChangedLiquidity & { + blockNumber: bigint; +}; + +function delay(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +const getTokenTransfers = async ( + blockNumber: number +): Promise => { + const PER_PAGE = 990; + let skip = 0; + let fetchNext = true; + let result: ChangedLiquidity[] = []; + let lastBlock = 0n; + loop: while (fetchNext) { + let query = ` { + liquidityChanges(first:${PER_PAGE}, skip: ${skip} , where:{ blockNumber_lte: ${blockNumber}, blockNumber_gt: ${lastBlock} }, orderBy: blockNumber, orderDirection: asc) { + owner + token + liquidity + blockNumber + } + }`; + + let response; + let count = 0; + + do { + response = await fetch(SUBGRAPH_URL, { + method: "POST", + body: JSON.stringify({ query }), + headers: { "Content-Type": "application/json" }, + }); + if (response.status != 200) { + console.log("fetching failed. Try again in 15 sec"); + await delay(15000); + } + ++count; + } while (response.status != 200 && count < 10); + + let data = await response.json(); + let positions: ChangedLiquidityWithBlock[] = data.data.liquidityChanges; + lastBlock = BigInt(positions[positions.length - 1].blockNumber); + for (let i = 0; i < positions.length; i++) { + if ( + positions.length === PER_PAGE && + BigInt(positions[i].blockNumber) == lastBlock + ) { + lastBlock = BigInt(positions[i - 1].blockNumber); + skip = 0; + continue loop; + } + let position = positions[i]; + result.push({ + owner: position.owner, + token: position.token, + liquidity: BigInt(position.liquidity), + }); + } + if (positions.length < PER_PAGE) { + fetchNext = false; + } else { + skip += PER_PAGE; + } + } + return result; +}; + +// group transfers by owner,token and sum liquidity +function getLiquidityFromTransfers( + transfers: ChangedLiquidity[] +): ChangedLiquidity[] { + const groupedTransfers: ChangedLiquidity[] = []; + const transferMap: Map> = new Map(); + + for (const transfer of transfers) { + const { owner, token, liquidity } = transfer; + const ownerMap = transferMap.get(owner) || new Map(); + const existingLiquidity = ownerMap.get(token) || 0n; + ownerMap.set(token, existingLiquidity + liquidity); + transferMap.set(owner, ownerMap); + } + + for (const [owner, tokenMap] of transferMap) { + for (const [token, liquidity] of tokenMap) { + if (liquidity == 0n) { + continue; + } + groupedTransfers.push({ owner, token, liquidity }); + } + } + return groupedTransfers; +} export async function getUserTVLByBlock( block: BlockData ): Promise { - const client = await getDBConnection(); const provider = await getProvider(); - const liquidities = await client.query({ - text: ` - SELECT owner, - token, - sum(liquidity) as liquidity - FROM "LiquidityTrace" - WHERE "blockNumber" <= $1 - AND LOWER("token") != LOWER($2) - GROUP BY "owner", "token" - HAVING sum(liquidity) > 0;`, - values: [block.blockNumber, WETH], - }); - const pgSqlShutdown = client.end(); + const transfers = await getTokenTransfers(block.blockNumber); + + const liquidities = getLiquidityFromTransfers(transfers); - const liquiditiesRows = liquidities.rows.map((r) => ({ + const liquiditiesRows = liquidities.map((r) => ({ ...r, pool: getPoolAddressFromTokenAddress(r.token), liquidity: BigInt(r.liquidity), @@ -159,7 +236,6 @@ export async function getUserTVLByBlock( ]; } ); - await Promise.all([pgSqlShutdown]); return result; } @@ -192,9 +268,6 @@ readBlocksFromCSV("hourly_blocks.csv") .then(async (blocks: any[]) => { console.log(blocks); const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks - const batchSize = 1000; // Size of batch to trigger writing to the file - let i = 0; - for (const block of blocks) { try { const result = await getUserTVLByBlock(block); @@ -205,8 +278,6 @@ readBlocksFromCSV("hourly_blocks.csv") } } await new Promise((resolve, reject) => { - // const randomTime = Math.random() * 1000; - // setTimeout(resolve, randomTime); const ws = fs.createWriteStream(`outputData.csv`, { flags: "w" }); write(allCsvRows, { headers: true }) .pipe(ws)