diff --git a/adapters/stargate/src/index.ts b/adapters/stargate/src/index.ts index 51d95ea1..1ea39d31 100644 --- a/adapters/stargate/src/index.ts +++ b/adapters/stargate/src/index.ts @@ -47,24 +47,32 @@ readBlocksFromCSV(path.resolve(__dirname, "../hourly_blocks.csv")) console.error("Error reading CSV file:", err); }); -function mergeStreams(positionStreams: PositionsStream[]) { - const csvWriteStream = fs.createWriteStream(`outputData.csv`, { - flags: "w", - }); - - csvWriteStream.write( - "block_number,timestamp,user_address,token_address,token_balance,token_symbol,usd_price\n" - ); - - let completedReads = 0; - - for (const source of positionStreams) { - source.on("end", () => { - if (++completedReads === positionStreams.length) { - csvWriteStream.end(); - } + function mergeStreams(positionStreams: PositionsStream[]) { + const csvWriteStream = fs.createWriteStream(`outputData.csv`, { + flags: "w", }); - - source.pipe(csvWriteStream, { end: false }); + + // Write CSV headers + csvWriteStream.write( + "block_number,timestamp,user_address,token_address,token_balance,token_symbol,usd_price\n" + ); + + let completedReads = 0; + + for (const source of positionStreams) { + source.on("data", (chunk) => { + const data = chunk.toString().trim(); + + // Write only non-empty rows with a newline at the end + if (data) { + csvWriteStream.write(data + "\n"); + } + }); + + source.on("end", () => { + if (++completedReads === positionStreams.length) { + csvWriteStream.end(); + } + }); + } } -}