Skip to content

Commit

Permalink
Merge pull request #278 from kyle-layerzero/stargate/v2-farms
Browse files Browse the repository at this point in the history
stargate: bridge: add v2 positions
  • Loading branch information
0xroll authored Aug 4, 2024
2 parents 9f7eacc + 25c0c2e commit 0d8d516
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 18 deletions.
55 changes: 41 additions & 14 deletions adapters/stargate/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import path from "path";

import { BlockData } from "./sdk/types";
import { PositionsStream } from "./sdk/lib";
import {
POSITIONS_V1_SUBGRAPH_URL,
POSITIONS_V2_SUBGRAPH_URL,
} from "./sdk/config";

const readBlocksFromCSV = async (filePath: string): Promise<BlockData[]> => {
const blocks: BlockData[] = [];

await new Promise<void>((resolve, reject) => {
fs.createReadStream(filePath)
.pipe(csv({ separator: "," })) // Specify the separator as '\t' for TSV files
.pipe(csv({ separator: "," }))
.on("data", (row) => {
//console.log(row);
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
//console.log(`Maybe Data ${blockNumber} ${blockTimestamp}`);

if (!isNaN(blockNumber) && blockTimestamp) {
//console.log(`Valid Data`);
blocks.push({ blockNumber: blockNumber, blockTimestamp });
}
})
Expand All @@ -34,19 +36,22 @@ const readBlocksFromCSV = async (filePath: string): Promise<BlockData[]> => {

readBlocksFromCSV(path.resolve(__dirname, "../hourly_blocks.csv"))
.then(async (blocks) => {
const csvWriteStream = fs.createWriteStream(`outputData.csv`, {
flags: "w",
});

csvWriteStream.write(
"block_number,timestamp,user_address,token_address,token_balance,token_symbol,usd_price\n"
);

for (const block of blocks) {
try {
const poisitionsStream = new PositionsStream(block);
const poisitionsStream = new PositionsStream(
block,
POSITIONS_V1_SUBGRAPH_URL
);
const v2PositionsStream = new PositionsStream(
block,
POSITIONS_V2_SUBGRAPH_URL
);

poisitionsStream.pipe(csvWriteStream);
mergeStreams([
//
poisitionsStream,
v2PositionsStream,
]);
} catch (error) {
console.error(
`An error occurred for block ${block.blockNumber}:`,
Expand All @@ -58,3 +63,25 @@ readBlocksFromCSV(path.resolve(__dirname, "../hourly_blocks.csv"))
.catch((err) => {
console.error("Error reading CSV file:", err);
});

function mergeStreams(positionStreams: PositionsStream[]) {
const csvWriteStream = fs.createWriteStream(`outputData.csv`, {
flags: "w",
});

csvWriteStream.write(
"block_number,timestamp,user_address,token_address,token_balance,token_symbol,usd_price\n"
);

let completedReads = 0;

for (const source of positionStreams) {
source.on("end", () => {
if (++completedReads === positionStreams.length) {
csvWriteStream.end();
}
});

source.pipe(csvWriteStream, { end: false });
}
}
4 changes: 3 additions & 1 deletion adapters/stargate/src/sdk/config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { createPublicClient, http } from "viem";
import { linea } from "viem/chains";

export const SUBGRAPH_URL =
export const POSITIONS_V1_SUBGRAPH_URL =
"https://api.studio.thegraph.com/query/72458/linea-balances/version/latest";
export const POSITIONS_V2_SUBGRAPH_URL =
"https://api.studio.thegraph.com/query/85232/linea-balances-v2/version/latest";

export const client = createPublicClient({
chain: linea,
Expand Down
10 changes: 7 additions & 3 deletions adapters/stargate/src/sdk/lib.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Readable } from "stream";
import { SUBGRAPH_URL, client } from "./config";
import { client } from "./config";
import { Position } from "./types";

const WHITELISTED_TOKEN_ADDRESS = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee";
Expand All @@ -14,7 +14,10 @@ export const getTimestampAtBlock = async (blockNumber: number) => {
export class PositionsStream extends Readable {
skip: string;

constructor(private block: { blockNumber: number; blockTimestamp: number }) {
constructor(
private block: { blockNumber: number; blockTimestamp: number },
private subgraphUrl: string
) {
super({ objectMode: true });
this.skip = "0";
}
Expand All @@ -36,7 +39,7 @@ export class PositionsStream extends Readable {
}
}`;

const response = await fetch(SUBGRAPH_URL, {
const response = await fetch(this.subgraphUrl, {
method: "POST",
body: JSON.stringify({ query }),
headers: { "Content-Type": "application/json" },
Expand All @@ -60,6 +63,7 @@ export class PositionsStream extends Readable {

if (rows.length) {
this.push(rows.join("\n"));
this.push("\n");
this.skip = farmPositions.at(-1).id;
} else {
this.push(null);
Expand Down

0 comments on commit 0d8d516

Please sign in to comment.