Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V8ValueString count increases until OutOfMemory error #423

Open
ggecy opened this issue Dec 3, 2024 · 20 comments · Fixed by #424
Open

V8ValueString count increases until OutOfMemory error #423

ggecy opened this issue Dec 3, 2024 · 20 comments · Fixed by #424
Assignees
Labels
bug Something isn't working

Comments

@ggecy
Copy link

ggecy commented Dec 3, 2024

I am running v8Runtime in Nodejs mode. I have several callback functions defined in java class annotated with @V8Function which only take some String parameters, no V8Value anywhere. I am not working with any V8Values within the callbacks and after some initial setup no more calls are done by java side. The java code starts the processing in nodejs which then many times per second calls the callback functions in java with a ton of exchange prices serialized to json strings. When I leave this running for several hours and make the memory dump via jmap -dump:live,format=b,file=dump.hprof <pid> I can see several hundred thousands of V8ValueString in java memory where GC root is Local JNI. I can see they are the strings I am sending from nodejs to java callbacks and they are never released. The count steadily increases until java crashes with OutOfMemory error.

@caoccao
Copy link
Owner

caoccao commented Dec 4, 2024

I think the issue is mostly that there are some resource leak points in your code. Could you leave a repo that can reproduce the issue for me to analyze?

@ggecy
Copy link
Author

ggecy commented Dec 4, 2024

I will have to create a new repo from scratch trying to replicate the problem since our project is just too big and hard to setup to give you the whole code, but in meantime here are some relevant pieces of code - I think there is no place in my code where it can leak the memory since I am not using anything from javet in the callbacks that could be closed/released.

There is a single worker thread that is calling await in infinite loop. This worker thread executes all the js callbacks which essentially just parse the data from JSON and add them to queues in java to avoid blocking the worker thread.

executorService.execute(() -> {
....
    while (!shouldStop.get() && runtime.await(V8AwaitMode.RunTillNoMoreTasks)) {
    }
....
}

Here is the simplified Java code that starts with a call to watchBidsAsks. Instance of this class is registered to globalThis:

...

JSBootloaderExtendedCommands bootloaderExtendedCommands = ...
V8ValueObject jsBootloaderCommands = runtime.createV8ValueObject();
runtime.getGlobalObject().set("BEC", jsBootloaderCommands);
jsBootloaderCommands.bind(bootloaderExtendedCommands);

...

public class CcxtCommands implements JSBootloaderExtendedCommands {

    private static final Logger logger = LoggerFactory.getLogger(CcxtCommands.class);

    private final V8Runtime v8Runtime;
    private final Map<String, List<Consumer<List<JavetTicker>>>> listeners = new HashMap<>();
    private final ObjectMapper objectMapper;

    public CcxtCommands(V8Runtime v8Runtime, ObjectMapper objectMapper) {
        this.v8Runtime = v8Runtime;
        this.objectMapper = objectMapper;
    }

    public void watchBidsAsks(String brokerName, List<String> symbols) throws JavetException {
        try (var tradesDataBroker = (V8ValueObject) v8Runtime.getGlobalObject().get("tradesDataBroker")) {
            tradesDataBroker.invokeVoid("watchBidsAsks", brokerName, symbols);
        }
    }

    @V8Function(name = "onBidsAsksReceived")
    public void onBidsAsksReceived(String brokerName, String dataJSON) {
        if (!StringUtils.hasText(dataJSON)) {
            return;
        }
        var listeners = this.listeners.get(brokerName);
        if (listeners == null || listeners.isEmpty()) {
            return;
        }
        try {
            var parsedData = objectMapper.readValue(dataJSON, new TypeReference<List<JavetTicker>>() {});
            for (var listener : listeners) {
                try {
                    listener.accept(parsedData);
                } catch (Exception e) {
                    logger.error("[onExchangeDataReceived] Error while calling listener: '{}'", listener, e);
                }
            }
        } catch (Exception e) {
            logger.error("[onBidsAsksReceived] Error while parsing data: '{}'", dataJSON, e);
        }
    }
}

Then in nodejs typescript:

...
watchBidsAsks(brokerName: string, symbols: string[]) {
     this.watchBidsAsksImpl(brokerName, symbols)
         .catch((error: unknown) => tradesDataBrokerLogger.error("Error watching bids/asks for exchange %s.", brokerName, { error }));
 }

private async watchBidsAsksImpl(brokerName: string, symbols: string[]) {
...
   BEC.onBidsAsksReceived(brokerName, JSON.stringify(bidsAsks));
...
}
...

Here you can see there is 10 GB of V8ValueStrings containing the data sent to callbacks which don't even reach my code since they are used only by javet internal code when converting them to simple String parameter values in my code.
Screenshot 2024-12-04 at 09 30 55

@caoccao
Copy link
Owner

caoccao commented Dec 4, 2024

From the screenshot, the V8ValueString is not GC rooted. Every V8ValueString holds an instance of NodeRuntime which is GC rooted. That's fine. So these instances of V8ValueString are not held by GC root. I guess they are in the stack of your function. Please check if your function implies recursive calls.

Of course, it's better to leave a repo so that I can spot the issue on my env.

@ggecy
Copy link
Author

ggecy commented Dec 4, 2024

Hmm, the function is not recursive, but it has an infinite while loop that uses await inside - this works just fine in pure nodejs - when I tried to launch the code directly in nodejs it ran for over 24 hours without any signs of memory leaks, I had memory stats logged in js code periodically and it was stable.

EDIT: As I mention in later comment, in current version the infinite while was replaced with setInterval() to throttle the execution to 10 times per second.

Here is the full typescript code that is responsible for this:

// import * as ccxt from 'ccxt';
import { Exchange, Ticker } from "ccxt";
import { ExchangeManager } from "../exchanges/exchange-manager";
import { BEC, loggerFactory } from "../globals";
import { Logger } from "winston";
import { BrokerName } from "../ccxt-bootloader-types";

interface StreamingExchange {
    readonly brokerName: string;
    readonly exchange: Exchange;
    readonly intervals: NodeJS.Timeout[];
    readonly logger: Logger;
    tradesReceived: number;
    tradesReceivedTotal: number;
    bidsAsksReceived: number;
    bidsAsksReceivedTotal: number;
}

const BATCH_SIZES: Readonly<Record<string, number>> = {
    BINANCE: 200
};

const TICKERS_PROCESSING_INTERVAL_MS = 100;
const REFRESH_SUBSCRIPTIONS_INTERVAL_MS = 10000;
const tradesDataBrokerLogger = loggerFactory.getLogger("TradesDataBroker");

export class TradesDataBroker {

    private streamingExchanges = new Map<BrokerName, StreamingExchange>();

    constructor(private readonly exchangeManager: ExchangeManager) {
    }

    // Can be called from javet
    // noinspection JSUnusedGlobalSymbols
    async stopStreamingExchange(brokerName: string) {
        const streamingExchange = this.streamingExchanges.get(brokerName);
        if (!streamingExchange) {
            return;
        }
        const { logger, exchange, intervals } = streamingExchange;
        logger.info("Closing exchange %s...", streamingExchange.brokerName);
        const startTime = Date.now();
        try {
            intervals.forEach(clearInterval);
            await exchange.close()
            logger.debug("Exchange closed in %d seconds", (Date.now() - startTime) / 1000);
        } catch (error) {
            logger.error("Failed to close exchange", { error });
        }
    }

    // called from javet
    // noinspection JSUnusedGlobalSymbols
    watchBidsAsks(brokerName: string, symbols: string[]) {
        this.watchBidsAsksImpl(brokerName, symbols)
            .catch((error: unknown) => tradesDataBrokerLogger.error("Error watching bids/asks for exchange %s.", brokerName, { error }));
    }

    private async getStreamingExchange(brokerName: string): Promise<StreamingExchange> {
        let streamingExchange = this.streamingExchanges.get(brokerName);
        if (!streamingExchange) {
            const { exchange } = await this.exchangeManager.createExchange({
                brokerName,
                exchangeId: `${brokerName}-streamer`,
                useCcxtPro: true
            });
            exchange.newUpdates = true;
            streamingExchange = {
                brokerName,
                exchange,
                intervals: [],
                logger: tradesDataBrokerLogger.child({ meta: [brokerName] }),
                tradesReceived: 0,
                tradesReceivedTotal: 0,
                bidsAsksReceived: 0,
                bidsAsksReceivedTotal: 0
            };
            this.streamingExchanges.set(brokerName, streamingExchange);
        }
        return streamingExchange;
    }

    private runInBatches(batchSize: number, symbols: string[], fun: (batch: string[]) => void) {
        let batch: string[] = [];
        for (const symbol of symbols) {
            batch.push(symbol);
            if (batch.length >= batchSize) {
                // need to make a local copy of batch reference as it will be overwritten in the next iteration and catch method would get wrong batch values
                void fun(batch);
                batch = [];
            }
        }
        if (batch.length > 0) {
            void fun(batch);
        }
    }

    private async watchBidsAsksImpl(brokerName: string, symbols: string[]) {
        const startTime = Date.now();
        const streamingExchange = await this.getStreamingExchange(brokerName);
        const { logger, exchange, intervals } = streamingExchange;

        const interval = setInterval(() => {
            logger.debug("Received %d bidsAsks per minute, total run time: %d seconds", streamingExchange.bidsAsksReceived, (Date.now() - startTime) / 1000);
            streamingExchange.bidsAsksReceived = 0;
        }, 60000);
        intervals.push(interval);

        const batchSize = BATCH_SIZES[brokerName] ?? Number.MAX_SAFE_INTEGER;
        this.runInBatches(batchSize, symbols, batch => {
            logger.debug("Watching bidsAsks for symbols %s", batch);
            exchange.watchBidsAsks(batch)
                .catch((error: unknown) => logger.error("Error watching bids/asks for symbols: %s.", batch, { error }));
        });
        const refreshInterval = setInterval(() => {
            this.runInBatches(batchSize, symbols, batch => {
                exchange.watchBidsAsks(batch)
                    .catch((error: unknown) => logger.error("Error watching bids/asks for symbols:%s.", batch, { error }));
            });
        }, REFRESH_SUBSCRIPTIONS_INTERVAL_MS);
        intervals.push(refreshInterval);

        const lastTickersForSymbols: Record<string, Ticker> = {};
        // implemented watchBidsAsks in throttled mode because realtime mode (awaiting promise in infinite loop)
        // had memory leak issue https://github.com/ccxt/ccxt/issues/23972
        const checkNewBidsAsksInterval = setInterval(() => {
            const startTime = Date.now();
            const bidsAsks = exchange.bidsasks;
            const tickers: Ticker[] = [];
            for (const symbol of symbols) {
                const symbolTicker = bidsAsks[symbol]
                if (!symbolTicker || symbolTicker === lastTickersForSymbols[symbol]) {
                    continue;
                }
                lastTickersForSymbols[symbol] = symbolTicker;
                tickers.push(symbolTicker);
            }
            const javaProcessingStartTime = Date.now();
            this.processBidsAsks(streamingExchange, tickers);
            const duration = Date.now() - startTime;
            if (duration > TICKERS_PROCESSING_INTERVAL_MS - 10) {
                logger.warn("Processing bidsAsks took %d ms, nodejs might become overloaded (java processing time: %d ms)", duration, Date.now() - javaProcessingStartTime);
            }
        }, TICKERS_PROCESSING_INTERVAL_MS);
        intervals.push(checkNewBidsAsksInterval);
    }

    private processBidsAsks(streamingExchange: StreamingExchange, tickers: Ticker[]) {
        if (!tickers.length) {
            return;
        }
        streamingExchange.bidsAsksReceived += tickers.length;
        streamingExchange.bidsAsksReceivedTotal += tickers.length;
        const { logger, brokerName } = streamingExchange;
        try {
            BEC.onBidsAsksReceived(brokerName, JSON.stringify(tickers));
        } catch (error) {
            logger.error("Error in processBidsAsks", { error });
        }
    }
}

@ggecy
Copy link
Author

ggecy commented Dec 4, 2024

Hmm, the function is not recursive, but it has an infinite while loop that uses await inside - this works just fine in pure nodejs - when I tried to launch the code directly in nodejs it ran for over 24 hours without any signs of memory leaks, I had memory stats logged in js code periodically and it was stable. Here is the full typescript code that is responsible for this:

I am sorry, the infinite while was in older version, current version uses setInterval() to throttle the execution to 10 times per second.

@caoccao
Copy link
Owner

caoccao commented Dec 4, 2024

setInterval() usually calls the callback function via another thread owned by Node.js. Please print the thread ID to get more ideas.

@ggecy
Copy link
Author

ggecy commented Dec 4, 2024

setInterval() usually calls the callback function via another thread owned by Node.js. Please print the thread ID to get more ideas.

Do you mean java thread? If yes then I have only 2 threads that are interacting with javet:

  1. all requests initiated from java are passed to a queue first and the queue is processed by a single thread from Executors.newSingleThreadExecutor(), since I had some unexpected errors otherwise if requests were started by multiple threads.
  2. all callbacks and promise resolutions/rejections are handled by a single worker thread which calls the v8Runtime.await(V8AwaitMode.RunTillNoMoreTasks) and it is basically sitting on that line indefinitely since there is always some active setInterval(). All the callbacks do is parsing json and pushing the results to other queues for further processing by other threads in java.

@caoccao
Copy link
Owner

caoccao commented Dec 4, 2024

Node.js has its own threads. Please print the thread ID.

@ggecy
Copy link
Author

ggecy commented Dec 4, 2024

Node.js has its own threads. Please print the thread ID.

I have added the following log:

import { isMainThread, threadId } from 'node:worker_threads';
...
logger.info("processBidsAsks isMainThread: %s, threadId: %s", isMainThread, threadId);
BEC.onBidsAsksReceived(brokerName, JSON.stringify(tickers));
...

Output:

[TradesDataBroker][BINANCE]: processBidsAsks isMainThread: true, threadId: 1
[TradesDataBroker][BINANCE]: processBidsAsks isMainThread: true, threadId: 1
[TradesDataBroker][BINANCE]: processBidsAsks isMainThread: true, threadId: 1
[TradesDataBroker][BINANCE]: processBidsAsks isMainThread: true, threadId: 1
...

@ggecy
Copy link
Author

ggecy commented Dec 4, 2024

Here is the repo with minimal reproducible example - see the instructions in readme:
https://github.com/ggecy/javet-issue-oom

@caoccao
Copy link
Owner

caoccao commented Dec 4, 2024

I'll check it out once I have time.

@ggecy
Copy link
Author

ggecy commented Dec 5, 2024

From the screenshot, the V8ValueString is not GC rooted. Every V8ValueString holds an instance of NodeRuntime which is GC rooted. That's fine. So these instances of V8ValueString are not held by GC root. I guess they are in the stack of your function. Please check if your function implies recursive calls.

Of course, it's better to leave a repo so that I can spot the issue on my env.

Just realized that can't be the case, since I have periodic memory stats in logs from nodejs and those are stable:
Node.js memory usage:
rss: 14145.3828125 MB
heapTotal: 977.5703125 MB
heapUsed: 474.80835723876953 MB
external: 3.432766914367676 MB
arrayBuffers: 2.3920211791992188 MB

At the same time, there are tens of gigabytes of V8ValueString in java memory.

@caoccao caoccao self-assigned this Dec 5, 2024
@caoccao caoccao added the bug Something isn't working label Dec 5, 2024
@caoccao
Copy link
Owner

caoccao commented Dec 5, 2024

@ggecy I've confirmed this is a serious bug that has been with Javet since it's born. I'll fix it soon. The reason it hasn't been noticed is the majority Javet use cases are all using Java as a server and Node.js as a script engine. In your case, you use Node.js as a server and Java as a back-end service.

@caoccao
Copy link
Owner

caoccao commented Dec 5, 2024

The fix was pushed to the dev branch. Please try the snapshot builds at Actions once they are ready.

@ggecy
Copy link
Author

ggecy commented Dec 5, 2024

The fix was pushed to the dev branch. Please try the snapshot builds at Actions once they are ready.

Is the snapshot published to some maven repository? Or should I just try to manually overwrite 4.1.0 version jars inside my local m2 repo?

@caoccao
Copy link
Owner

caoccao commented Dec 5, 2024

You will have to download the snapshot and point your project to it.

@ggecy
Copy link
Author

ggecy commented Dec 5, 2024

Thanks, locally it seems to be working. The increasing number of V8ValueStrings in jmap output is no longer there. I will have to deploy it to server and let it work there for few days to see if it's stable, but it seems fine after about 10 minutes of running. When will it be released?

@caoccao
Copy link
Owner

caoccao commented Dec 5, 2024

Great. I'll try to release as soon as possible. It's better to ping me at discord for the updates.

@caoccao
Copy link
Owner

caoccao commented Dec 7, 2024

It's released. Please let me know if it works.

@ggecy
Copy link
Author

ggecy commented Dec 13, 2024

Thanks, yes.

@caoccao caoccao linked a pull request Dec 14, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants