Skip to content

Commit

Permalink
Merge pull request #195 from xfai-labs/add-xfai
Browse files Browse the repository at this point in the history
[xfai] switch to thegraph
  • Loading branch information
0xroll authored May 31, 2024
2 parents 353c985 + 4b60193 commit 7776842
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 33 deletions.
2 changes: 1 addition & 1 deletion adapters/xfai/hourly_blocks.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
number,timestamp
3976979,1713974398
4828868,1713974398
5 changes: 3 additions & 2 deletions adapters/xfai/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
export const CHAIN_ID = 59144;
export const RPC_URL = "https://rpc.linea.build";
export const LIQUIDITY_EVENTS_DB =
"postgres://mobula_readonly:[email protected]:5432/xfai-logs?sslmode=no-verify";
export const XFAI_FACTORY = "0xa5136eAd459F0E61C99Cec70fe8F5C24cF3ecA26";
export const XFAI_POOL_INIT =
"0xd29425d309539268aa2f934062f86ea332822e787dafc6baba7cfda029630330";
export const MULTICALL = "0xca11bde05977b3631167028862be2a173976ca11";
export const WETH = "0xe5D7C2a44FfDDf6b295A15c148167daaAf5Cf34f";

export const SUBGRAPH_URL =
"https://api.studio.thegraph.com/query/68034/xfai-dex/v1.0.0";
131 changes: 101 additions & 30 deletions adapters/xfai/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Client } from "pg";
import {
CHAIN_ID,
LIQUIDITY_EVENTS_DB,
RPC_URL,
SUBGRAPH_URL,
WETH,
XFAI_FACTORY,
XFAI_POOL_INIT,
Expand Down Expand Up @@ -40,12 +39,6 @@ type OutputDataSchemaRow = {
usd_price: number; //assign 0 if not available
};

async function getDBConnection() {
const client = new Client(LIQUIDITY_EVENTS_DB);
await client.connect();
return client;
}

async function getProvider() {
const provider = new StaticJsonRpcProvider(RPC_URL, CHAIN_ID);
await provider.ready;
Expand All @@ -55,30 +48,114 @@ async function getProvider() {
type ChangedLiquidity = {
owner: string;
token: string;
liquidity: number;
liquidity: bigint;
};
type ChangedLiquidityWithBlock = ChangedLiquidity & {
blockNumber: bigint;
};

function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

const getTokenTransfers = async (
blockNumber: number
): Promise<ChangedLiquidity[]> => {
const PER_PAGE = 990;
let skip = 0;
let fetchNext = true;
let result: ChangedLiquidity[] = [];
let lastBlock = 0n;
loop: while (fetchNext) {
let query = ` {
liquidityChanges(first:${PER_PAGE}, skip: ${skip} , where:{ blockNumber_lte: ${blockNumber}, blockNumber_gt: ${lastBlock} }, orderBy: blockNumber, orderDirection: asc) {
owner
token
liquidity
blockNumber
}
}`;

let response;
let count = 0;

do {
response = await fetch(SUBGRAPH_URL, {
method: "POST",
body: JSON.stringify({ query }),
headers: { "Content-Type": "application/json" },
});
if (response.status != 200) {
console.log("fetching failed. Try again in 15 sec");
await delay(15000);
}
++count;
} while (response.status != 200 && count < 10);

let data = await response.json();
let positions: ChangedLiquidityWithBlock[] = data.data.liquidityChanges;
lastBlock = BigInt(positions[positions.length - 1].blockNumber);
for (let i = 0; i < positions.length; i++) {
if (
positions.length === PER_PAGE &&
BigInt(positions[i].blockNumber) == lastBlock
) {
lastBlock = BigInt(positions[i - 1].blockNumber);
skip = 0;
continue loop;
}
let position = positions[i];
result.push({
owner: position.owner,
token: position.token,
liquidity: BigInt(position.liquidity),
});
}
if (positions.length < PER_PAGE) {
fetchNext = false;
} else {
skip += PER_PAGE;
}
}
return result;
};

// group transfers by owner,token and sum liquidity
function getLiquidityFromTransfers(
transfers: ChangedLiquidity[]
): ChangedLiquidity[] {
const groupedTransfers: ChangedLiquidity[] = [];
const transferMap: Map<string, Map<string, bigint>> = new Map();

for (const transfer of transfers) {
const { owner, token, liquidity } = transfer;
const ownerMap = transferMap.get(owner) || new Map();
const existingLiquidity = ownerMap.get(token) || 0n;
ownerMap.set(token, existingLiquidity + liquidity);
transferMap.set(owner, ownerMap);
}

for (const [owner, tokenMap] of transferMap) {
for (const [token, liquidity] of tokenMap) {
if (liquidity == 0n) {
continue;
}
groupedTransfers.push({ owner, token, liquidity });
}
}
return groupedTransfers;
}

export async function getUserTVLByBlock(
block: BlockData
): Promise<OutputDataSchemaRow[]> {
const client = await getDBConnection();
const provider = await getProvider();

const liquidities = await client.query<ChangedLiquidity>({
text: `
SELECT owner,
token,
sum(liquidity) as liquidity
FROM "LiquidityTrace"
WHERE "blockNumber" <= $1
AND LOWER("token") != LOWER($2)
GROUP BY "owner", "token"
HAVING sum(liquidity) > 0;`,
values: [block.blockNumber, WETH],
});
const pgSqlShutdown = client.end();
const transfers = await getTokenTransfers(block.blockNumber);

const liquidities = getLiquidityFromTransfers(transfers);

const liquiditiesRows = liquidities.rows.map((r) => ({
const liquiditiesRows = liquidities.map((r) => ({
...r,
pool: getPoolAddressFromTokenAddress(r.token),
liquidity: BigInt(r.liquidity),
Expand Down Expand Up @@ -159,7 +236,6 @@ export async function getUserTVLByBlock(
];
}
);
await Promise.all([pgSqlShutdown]);

return result;
}
Expand Down Expand Up @@ -192,9 +268,6 @@ readBlocksFromCSV("hourly_blocks.csv")
.then(async (blocks: any[]) => {
console.log(blocks);
const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks
const batchSize = 1000; // Size of batch to trigger writing to the file
let i = 0;

for (const block of blocks) {
try {
const result = await getUserTVLByBlock(block);
Expand All @@ -205,8 +278,6 @@ readBlocksFromCSV("hourly_blocks.csv")
}
}
await new Promise((resolve, reject) => {
// const randomTime = Math.random() * 1000;
// setTimeout(resolve, randomTime);
const ws = fs.createWriteStream(`outputData.csv`, { flags: "w" });
write(allCsvRows, { headers: true })
.pipe(ws)
Expand Down

0 comments on commit 7776842

Please sign in to comment.