Skip to content

Commit

Permalink
add some new function
Browse files Browse the repository at this point in the history
  • Loading branch information
miler012 committed May 29, 2024
1 parent 06eb534 commit 4569f40
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 78 deletions.
69 changes: 59 additions & 10 deletions adapters/satori/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import fs from 'fs';
import { write } from 'fast-csv';
import { OutputDataSchemaRow,getUserTVLByBlock } from './sdk/subgraphDetails';
import { OutputDataSchemaRow,queryUserTVLByBlock } from './sdk/subgraphDetails';
import csv from 'csv-parser';
import * as swapindex from './sdk/swap/swapindex'


interface BlockData {
blockNumber: number;
blockTimestamp: number;
Expand All @@ -12,9 +12,9 @@ interface BlockData {

async function getUserTvlFromPerpetual(blocks: BlockData[]) {
let snapshots: OutputDataSchemaRow[] = [];
for (const {blockNumber, blockTimestamp} of blocks) {
try {
snapshots = snapshots.concat(await getUserTVLByBlock(blockNumber, blockTimestamp))
for (const { blockNumber, blockTimestamp } of blocks) {
try {
snapshots = snapshots.concat(await queryUserTVLByBlock(blockNumber,blockTimestamp))
} catch (error) {
console.error(`An error occurred for block ${blockNumber}:`, error);
}
Expand Down Expand Up @@ -65,24 +65,73 @@ async function mergeTvl(from: {[p: string]: OutputDataSchemaRow}, to: { [p: stri
return to;
}

export const main = async (blocks: BlockData[]) => {
export const queryAllByBloks = async (blocks: BlockData[]) => {
// tvl in perpetual
let groupedSnapshots = await getUserTvlFromPerpetual(blocks);
// tvl in swap
let groupedSnapshots2 = await getUserTvlFromSwap(blocks);

// merge tvl: from swap to perpetual
groupedSnapshots = await mergeTvl(groupedSnapshots2, groupedSnapshots);

let csvRows: OutputDataSchemaRow[] = Object.values(groupedSnapshots);
console.log(`length:---${csvRows.length}`);
return csvRows;
/*console.log(`length:---${csvRows.length}`);
const ws = fs.createWriteStream('outputData.csv');
write(csvRows, { headers: true }).pipe(ws).on('finish', () => {
console.log("CSV file has been written.");
});
});*/
};

// 4457308
export const getUserTVLByBlock = async(blocks: BlockData) =>{
const { blockNumber, blockTimestamp } = blocks
return await queryAllByBloks([blocks])
}

const readBlocksFromCSV = async (filePath: string): Promise<BlockData[]> => {
const blocks: BlockData[] = [];

await new Promise<void>((resolve, reject) => {
fs.createReadStream(filePath)
.pipe(csv()) // Specify the separator as '\t' for TSV files
.on('data', (row) => {
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
if (!isNaN(blockNumber) && blockTimestamp) {
blocks.push({ blockNumber: blockNumber, blockTimestamp });
}
})
.on('end', () => {
resolve();
})
.on('error', (err) => {
reject(err);
});
});

return blocks;
};

readBlocksFromCSV('hourly_blocks.csv').then(async (blocks: any[]) => {
console.log(blocks);
let allCsvRows: any[] = [];

let csvRows: OutputDataSchemaRow[] = await queryAllByBloks(blocks);
console.log(`length:---${csvRows.length}`);
await new Promise((resolve, reject) => {
const ws = fs.createWriteStream(`outputData.csv`, { flags: 'w' });
write(csvRows, { headers: true })
.pipe(ws)
.on("finish", () => {
console.log(`CSV file has been written.`);
resolve;
});
});

}).catch((err) => {
console.error('Error reading CSV file:', err);
});

// main([{blockNumber:4457308,blockTimestamp:1715394711}]).then(() => {
// main([{blockNumber:669512,blockTimestamp:1715394711}]).then(() => {
// console.log("Done");
// });
1 change: 1 addition & 0 deletions adapters/satori/src/sdk/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export const SUBGRAPH_URL = "https://api.goldsky.com/api/private/project_clw1so6mrsn6o01uafow40xlo/subgraphs/satori-linea-perpet/1.0.0/gn"
export const ASSET = "0x176211869cA2b568f2A7D4EE941E073a821EE1ff"
export const SYMBOL = "USDC"
export const PRICE = 1
export const KEY = "Bearer clw1sqzpysmqg01x5h17y1jvq"


Expand Down
12 changes: 7 additions & 5 deletions adapters/satori/src/sdk/subgraphDetails.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import { ASSET, SYMBOL, SUBGRAPH_URL,KEY } from "./config";
import { ASSET, SYMBOL, SUBGRAPH_URL,KEY,PRICE } from "./config";

export interface OutputDataSchemaRow {
block_number:number
timestamp:number
user_address:string
token_address:string
token_symbol:string
token_balance:number
token_balance:bigint
usd_price: number
}

export const getUserTVLByBlock = async (
export const queryUserTVLByBlock = async (
blockNumber: number,
timestamp: number,
):Promise<OutputDataSchemaRow[]> => {
Expand Down Expand Up @@ -44,8 +45,9 @@ export const getUserTVLByBlock = async (
user_address:snapshot.user,
token_address:ASSET,
token_symbol:SYMBOL,
token_balance:snapshot.lpAmount
}
token_balance:snapshot.lpAmount,
usd_price: PRICE
}
result.push(userLpSnapshot)
}
if(snapshots.length < 1000){
Expand Down
133 changes: 70 additions & 63 deletions adapters/satori/src/sdk/swap/swapindex.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { promisify } from 'util';
import {promisify} from 'util';
import stream from 'stream';
import csv from 'csv-parser';
import fs from 'fs';
import { write } from 'fast-csv';
import {write} from 'fast-csv';

import { BlockData } from './types';
import { OutputDataSchemaRow as OutputSchemaRow } from '../subgraphDetails';
import { getTimestampAtBlock, getV2UserPositionsAtBlock } from './lib';
import {BlockData} from './types';
import {OutputDataSchemaRow as OutputSchemaRow} from '../subgraphDetails';
import {getTimestampAtBlock, getV2UserPositionsAtBlock} from './lib';
import BigNumber from "bignumber.js";

const pipeline = promisify(stream.pipeline);
Expand Down Expand Up @@ -36,17 +36,17 @@ export const getData = async () => {
for (const block of blocks) {
const timestamp = await getTimestampAtBlock(block)

csvRows.push(...await getUserTVLByBlock({ blockNumber: block, blockTimestamp: timestamp }))
csvRows.push(...await getUserTVLByBlock({blockNumber: block, blockTimestamp: timestamp}))
}

// Write the CSV output to a file
const ws = fs.createWriteStream('outputData.csv');
write(csvRows, { headers: true }).pipe(ws).on('finish', () => {
write(csvRows, {headers: true}).pipe(ws).on('finish', () => {
console.log("CSV file has been written.");
});
};

export const getUserTVLByBlock = async ({ blockNumber, blockTimestamp }: BlockData): Promise<OutputSchemaRow[]> => {
export const getUserTVLByBlock = async ({blockNumber, blockTimestamp}: BlockData): Promise<OutputSchemaRow[]> => {
const result: OutputSchemaRow[] = []

const [v2Positions] = await Promise.all([
Expand All @@ -56,35 +56,41 @@ export const getUserTVLByBlock = async ({ blockNumber, blockTimestamp }: BlockDa
// combine v2 & v3
const combinedPositions = [...v2Positions]
const balances: Record<string, Record<string, bigint>> = {}
let tokenSymbol:Record<string,string> = {};
let tokenDecimals:Record<string,Number> = {};
let tokenSymbol: Record<string, string> = {};
let tokenDecimals: Record<string, Number> = {};
let tokenPrices: Record<string, BigInt> = {};
for (const position of combinedPositions) {
// console.log("position:", position)
balances[position.user] = balances[position.user] || {}

if(tokenSymbol[position.token0.address] == null){
if (tokenSymbol[position.token0.address] == null) {
tokenSymbol[position.token0.address] = position.token0.symbol
}
if(tokenSymbol[position.token1.address] == null){
if (tokenSymbol[position.token1.address] == null) {
tokenSymbol[position.token1.address] = position.token1.symbol
}

if(tokenDecimals[position.token0.address] == null){
if (tokenDecimals[position.token0.address] == null) {
tokenDecimals[position.token0.address] = position.token0.decimals
}
if(tokenDecimals[position.token1.address] == null){
if (tokenDecimals[position.token1.address] == null) {
tokenDecimals[position.token1.address] = position.token1.decimals
}

if (position.token0.balance > 0)
balances[position.user][position.token0.address] =
(balances?.[position.user]?.[position.token0.address] ?? 0)
balances[position.user][position.token0.address] =
(balances?.[position.user]?.[position.token0.address] ?? 0)
+ position.token0.balance

if (position.token1.balance > 0)
balances[position.user][position.token1.address] =
(balances?.[position.user]?.[position.token1.address] ?? 0)
balances[position.user][position.token1.address] =
(balances?.[position.user]?.[position.token1.address] ?? 0)
+ position.token1.balance

if (position.token0.usdPrice > 0)
tokenPrices[position.token0.address] = position.token0.usdPrice
if (position.token1.usdPrice > 0)
tokenPrices[position.token1.address] = position.token1.usdPrice
}

// console.log("balances:", balances)
Expand All @@ -101,7 +107,8 @@ export const getUserTVLByBlock = async ({ blockNumber, blockTimestamp }: BlockDa
user_address: user,
token_address: token,
token_symbol: tokenSymbol[token],
token_balance: balanceSm.multipliedBy(tokenDecimalMp).integerValue(BigNumber.ROUND_DOWN).toNumber(),
token_balance: BigInt(balanceSm.multipliedBy(tokenDecimalMp).integerValue(BigNumber.ROUND_DOWN).toNumber()),
usd_price: new BigNumber(tokenPrices[token].toString()).toNumber()
})
}
}
Expand All @@ -112,54 +119,54 @@ export const getUserTVLByBlock = async ({ blockNumber, blockTimestamp }: BlockDa

const readBlocksFromCSV = async (filePath: string): Promise<BlockData[]> => {
const blocks: BlockData[] = [];

await new Promise<void>((resolve, reject) => {
fs.createReadStream(filePath)
.pipe(csv()) // Specify the separator as '\t' for TSV files
.on('data', (row) => {
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
if (!isNaN(blockNumber) && blockTimestamp) {
blocks.push({ blockNumber: blockNumber, blockTimestamp });
}
})
.on('end', () => {
resolve();
})
.on('error', (err) => {
reject(err);
});
fs.createReadStream(filePath)
.pipe(csv()) // Specify the separator as '\t' for TSV files
.on('data', (row) => {
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
if (!isNaN(blockNumber) && blockTimestamp) {
blocks.push({blockNumber: blockNumber, blockTimestamp});
}
})
.on('end', () => {
resolve();
})
.on('error', (err) => {
reject(err);
});
});

return blocks;
};

/*readBlocksFromCSV('hourly_blocks.csv').then(async (blocks: any[]) => {
console.log(blocks);
const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks
};

for (const block of blocks) {
try {
const result = await getUserTVLByBlock(block);
// Accumulate CSV rows for all blocks
allCsvRows.push(...result);
} catch (error) {
console.error(`An error occurred for block ${block}:`, error);
}
}
await new Promise((resolve, reject) => {
const ws = fs.createWriteStream(`outputData.csv`, { flags: 'w' });
write(allCsvRows, { headers: true })
.pipe(ws)
.on("finish", () => {
console.log(`CSV file has been written.`);
resolve;
});
});
}).catch((err) => {
console.error('Error reading CSV file:', err);
});*/
/*readBlocksFromCSV('hourly_blocks.csv').then(async (blocks: any[]) => {
console.log(blocks);
const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks
for (const block of blocks) {
try {
const result = await getUserTVLByBlock(block);
// Accumulate CSV rows for all blocks
allCsvRows.push(...result);
} catch (error) {
console.error(`An error occurred for block ${block}:`, error);
}
}
await new Promise((resolve, reject) => {
const ws = fs.createWriteStream(`outputData.csv`, { flags: 'w' });
write(allCsvRows, { headers: true })
.pipe(ws)
.on("finish", () => {
console.log(`CSV file has been written.`);
resolve;
});
});
}).catch((err) => {
console.error('Error reading CSV file:', err);
});*/

// getData().then(() => {
// console.log("Done");
Expand Down

0 comments on commit 4569f40

Please sign in to comment.