-
-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
V8ValueString count increases until OutOfMemory error #423
Comments
I think the issue is mostly that there are some resource leak points in your code. Could you leave a repo that can reproduce the issue for me to analyze? |
From the screenshot, the Of course, it's better to leave a repo so that I can spot the issue on my env. |
Hmm, the function is not recursive, but it has an infinite while loop that uses await inside - this works just fine in pure nodejs - when I tried to launch the code directly in nodejs it ran for over 24 hours without any signs of memory leaks, I had memory stats logged in js code periodically and it was stable. EDIT: As I mention in later comment, in current version the infinite while was replaced with setInterval() to throttle the execution to 10 times per second. Here is the full typescript code that is responsible for this: // import * as ccxt from 'ccxt';
import { Exchange, Ticker } from "ccxt";
import { ExchangeManager } from "../exchanges/exchange-manager";
import { BEC, loggerFactory } from "../globals";
import { Logger } from "winston";
import { BrokerName } from "../ccxt-bootloader-types";
interface StreamingExchange {
readonly brokerName: string;
readonly exchange: Exchange;
readonly intervals: NodeJS.Timeout[];
readonly logger: Logger;
tradesReceived: number;
tradesReceivedTotal: number;
bidsAsksReceived: number;
bidsAsksReceivedTotal: number;
}
const BATCH_SIZES: Readonly<Record<string, number>> = {
BINANCE: 200
};
const TICKERS_PROCESSING_INTERVAL_MS = 100;
const REFRESH_SUBSCRIPTIONS_INTERVAL_MS = 10000;
const tradesDataBrokerLogger = loggerFactory.getLogger("TradesDataBroker");
export class TradesDataBroker {
private streamingExchanges = new Map<BrokerName, StreamingExchange>();
constructor(private readonly exchangeManager: ExchangeManager) {
}
// Can be called from javet
// noinspection JSUnusedGlobalSymbols
async stopStreamingExchange(brokerName: string) {
const streamingExchange = this.streamingExchanges.get(brokerName);
if (!streamingExchange) {
return;
}
const { logger, exchange, intervals } = streamingExchange;
logger.info("Closing exchange %s...", streamingExchange.brokerName);
const startTime = Date.now();
try {
intervals.forEach(clearInterval);
await exchange.close()
logger.debug("Exchange closed in %d seconds", (Date.now() - startTime) / 1000);
} catch (error) {
logger.error("Failed to close exchange", { error });
}
}
// called from javet
// noinspection JSUnusedGlobalSymbols
watchBidsAsks(brokerName: string, symbols: string[]) {
this.watchBidsAsksImpl(brokerName, symbols)
.catch((error: unknown) => tradesDataBrokerLogger.error("Error watching bids/asks for exchange %s.", brokerName, { error }));
}
private async getStreamingExchange(brokerName: string): Promise<StreamingExchange> {
let streamingExchange = this.streamingExchanges.get(brokerName);
if (!streamingExchange) {
const { exchange } = await this.exchangeManager.createExchange({
brokerName,
exchangeId: `${brokerName}-streamer`,
useCcxtPro: true
});
exchange.newUpdates = true;
streamingExchange = {
brokerName,
exchange,
intervals: [],
logger: tradesDataBrokerLogger.child({ meta: [brokerName] }),
tradesReceived: 0,
tradesReceivedTotal: 0,
bidsAsksReceived: 0,
bidsAsksReceivedTotal: 0
};
this.streamingExchanges.set(brokerName, streamingExchange);
}
return streamingExchange;
}
private runInBatches(batchSize: number, symbols: string[], fun: (batch: string[]) => void) {
let batch: string[] = [];
for (const symbol of symbols) {
batch.push(symbol);
if (batch.length >= batchSize) {
// need to make a local copy of batch reference as it will be overwritten in the next iteration and catch method would get wrong batch values
void fun(batch);
batch = [];
}
}
if (batch.length > 0) {
void fun(batch);
}
}
private async watchBidsAsksImpl(brokerName: string, symbols: string[]) {
const startTime = Date.now();
const streamingExchange = await this.getStreamingExchange(brokerName);
const { logger, exchange, intervals } = streamingExchange;
const interval = setInterval(() => {
logger.debug("Received %d bidsAsks per minute, total run time: %d seconds", streamingExchange.bidsAsksReceived, (Date.now() - startTime) / 1000);
streamingExchange.bidsAsksReceived = 0;
}, 60000);
intervals.push(interval);
const batchSize = BATCH_SIZES[brokerName] ?? Number.MAX_SAFE_INTEGER;
this.runInBatches(batchSize, symbols, batch => {
logger.debug("Watching bidsAsks for symbols %s", batch);
exchange.watchBidsAsks(batch)
.catch((error: unknown) => logger.error("Error watching bids/asks for symbols: %s.", batch, { error }));
});
const refreshInterval = setInterval(() => {
this.runInBatches(batchSize, symbols, batch => {
exchange.watchBidsAsks(batch)
.catch((error: unknown) => logger.error("Error watching bids/asks for symbols:%s.", batch, { error }));
});
}, REFRESH_SUBSCRIPTIONS_INTERVAL_MS);
intervals.push(refreshInterval);
const lastTickersForSymbols: Record<string, Ticker> = {};
// implemented watchBidsAsks in throttled mode because realtime mode (awaiting promise in infinite loop)
// had memory leak issue https://github.com/ccxt/ccxt/issues/23972
const checkNewBidsAsksInterval = setInterval(() => {
const startTime = Date.now();
const bidsAsks = exchange.bidsasks;
const tickers: Ticker[] = [];
for (const symbol of symbols) {
const symbolTicker = bidsAsks[symbol]
if (!symbolTicker || symbolTicker === lastTickersForSymbols[symbol]) {
continue;
}
lastTickersForSymbols[symbol] = symbolTicker;
tickers.push(symbolTicker);
}
const javaProcessingStartTime = Date.now();
this.processBidsAsks(streamingExchange, tickers);
const duration = Date.now() - startTime;
if (duration > TICKERS_PROCESSING_INTERVAL_MS - 10) {
logger.warn("Processing bidsAsks took %d ms, nodejs might become overloaded (java processing time: %d ms)", duration, Date.now() - javaProcessingStartTime);
}
}, TICKERS_PROCESSING_INTERVAL_MS);
intervals.push(checkNewBidsAsksInterval);
}
private processBidsAsks(streamingExchange: StreamingExchange, tickers: Ticker[]) {
if (!tickers.length) {
return;
}
streamingExchange.bidsAsksReceived += tickers.length;
streamingExchange.bidsAsksReceivedTotal += tickers.length;
const { logger, brokerName } = streamingExchange;
try {
BEC.onBidsAsksReceived(brokerName, JSON.stringify(tickers));
} catch (error) {
logger.error("Error in processBidsAsks", { error });
}
}
} |
I am sorry, the infinite while was in older version, current version uses setInterval() to throttle the execution to 10 times per second. |
|
Do you mean java thread? If yes then I have only 2 threads that are interacting with javet:
|
Node.js has its own threads. Please print the thread ID. |
I have added the following log:
Output:
|
Here is the repo with minimal reproducible example - see the instructions in readme: |
I'll check it out once I have time. |
Just realized that can't be the case, since I have periodic memory stats in logs from nodejs and those are stable: At the same time, there are tens of gigabytes of V8ValueString in java memory. |
@ggecy I've confirmed this is a serious bug that has been with Javet since it's born. I'll fix it soon. The reason it hasn't been noticed is the majority Javet use cases are all using Java as a server and Node.js as a script engine. In your case, you use Node.js as a server and Java as a back-end service. |
The fix was pushed to the dev branch. Please try the snapshot builds at Actions once they are ready. |
Is the snapshot published to some maven repository? Or should I just try to manually overwrite 4.1.0 version jars inside my local m2 repo? |
You will have to download the snapshot and point your project to it. |
Thanks, locally it seems to be working. The increasing number of V8ValueStrings in jmap output is no longer there. I will have to deploy it to server and let it work there for few days to see if it's stable, but it seems fine after about 10 minutes of running. When will it be released? |
Great. I'll try to release as soon as possible. It's better to ping me at discord for the updates. |
It's released. Please let me know if it works. |
Thanks, yes. |
I am running v8Runtime in Nodejs mode. I have several callback functions defined in java class annotated with @V8Function which only take some String parameters, no V8Value anywhere. I am not working with any V8Values within the callbacks and after some initial setup no more calls are done by java side. The java code starts the processing in nodejs which then many times per second calls the callback functions in java with a ton of exchange prices serialized to json strings. When I leave this running for several hours and make the memory dump via
jmap -dump:live,format=b,file=dump.hprof <pid>
I can see several hundred thousands of V8ValueString in java memory where GC root is Local JNI. I can see they are the strings I am sending from nodejs to java callbacks and they are never released. The count steadily increases until java crashes with OutOfMemory error.The text was updated successfully, but these errors were encountered: