Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Support bullmq #4037

Open
lzj960515 opened this issue May 24, 2024 · 7 comments
Open

[Question] Support bullmq #4037

lzj960515 opened this issue May 24, 2024 · 7 comments

Comments

@lzj960515
Copy link

I am using bullmq as a queue on Node, and I have integrated it into apm using a custom method, but I have encountered some issues. Here is my code.

const apm = require('elastic-apm-node/start');
const shimmer = require('elastic-apm-node/lib/instrumentation/shimmer');
const { Worker } = require('bullmq');

shimmer.wrap(Worker.prototype, 'callProcessJob', function (original) {
    return async function callProcessJob(job, token) {
        const transName = `${job.queueName}[${job.name}]`;
        const trans = apm.startTransaction(transName, 'queue');
        apm.setCustomContext(job.toJSON());
        try {
            trans.outcome = 'success';
            return await original.call(this, job, token);
        } catch (e) {
            trans.result = e.message;
            trans.outcome = 'failure';
            apm.captureError(e);
            throw e;
        } finally {
            trans.end();
        }
    };
});

Although I have called the trans.end() method, the apm still collected the methods called in bullmq to redis, which should not have happened. What should I do to avoid this situation?

image

Thank you for your help.

@zurmokeeper
Copy link

@lzj960515 bullmq is based on the redis implementation, so it should be normal to collect the redis command stack here, right?

@trentm
Copy link
Member

trentm commented May 24, 2024

Hi @lzj960515,
I'll be guessing here a little bit because I haven't used bullmq. From a brief look at bullmq/dist/cjs/classes/worker.js and sandbox.js, it looks like bullmq might be doing the actual work of Worker.prototype.callProcessJob in a "child" from the childPool. If that "child" (which could be a worker thread? or a separate Node.js process? not sure) runs in a different context, then spans created while processing the job will be created with a parent other than the transaction that you manually created.

My guess, from your screenshot, is that you have one seemingly random transaction that is getting the Redis spans for every bullmq job. My guess is that that "random transaction" is the transaction that happened to be active when the "child" or children on the childPool were created.

I'm not sure I can explain the "different context" well. Can you show a small working example? That would help me dig into what is going on.

@trentm
Copy link
Member

trentm commented May 24, 2024

I looked at bullmq worker.js a little more.

  1. I think I'm wrong about it being the context of the "child". There is an internal asyncFifoQueue used in Worker.prototype.run that will be breaking context.
  2. Those BZPOPMIN redis calls are being done in this._getNextJob before this.processJob is called. It is processJob that calls the callProcessJob that you have shimmed. So the BZPOPMIN Redis spans will be created before that transaction is created.

@lzj960515
Copy link
Author

I looked at bullmq worker.js a little more.

  1. I think I'm wrong about it being the context of the "child". There is an internal asyncFifoQueue used in Worker.prototype.run that will be breaking context.
  2. Those BZPOPMIN redis calls are being done in this._getNextJob before this.processJob is called. It is processJob that calls the callProcessJob that you have shimmed. So the BZPOPMIN Redis spans will be created before that transaction is created.

Thank you for your response. So, how can I remove the BZPOPMIN call chain? I started a transaction before callProcessJob and ended it after, theoretically there shouldn’t be BZPOPMIN calls within my transaction context.

@trentm
Copy link
Member

trentm commented May 27, 2024

I started a transaction before callProcessJob and ended it after, theoretically there shouldn’t be BZPOPMIN calls within my transaction context.

I'm not sure from your screenshot that the BZPOPMIN and EVALSHA spans are part of a transaction created by your callProcessJob instrumentation. Are they? Perhaps they are getting attached to the first such transaction after your application starts?

As a quick and poor hack, apm.addSpanFilter() could be used to drop any redis span named "BZPOPMIN" or "EVALSHA". Obviously that is a poor hack that could drop otherwise valid spans.

There is an internal apm._instrumentation.supersedeWithEmptyRunContext() that might help, used in the correct place. That obviously is not a great suggestion because it is using an internal API.

If there was a small reproducible example using bullmq I could play a bit to possibly find a solution.

@trentm trentm removed the triage label May 27, 2024
@lzj960515
Copy link
Author

I started a transaction before callProcessJob and ended it after, theoretically there shouldn’t be BZPOPMIN calls within my transaction context.

I'm not sure from your screenshot that the BZPOPMIN and EVALSHA spans are part of a transaction created by your callProcessJob instrumentation. Are they? Perhaps they are getting attached to the first such transaction after your application starts?

As a quick and poor hack, apm.addSpanFilter() could be used to drop any redis span named "BZPOPMIN" or "EVALSHA". Obviously that is a poor hack that could drop otherwise valid spans.

There is an internal apm._instrumentation.supersedeWithEmptyRunContext() that might help, used in the correct place. That obviously is not a great suggestion because it is using an internal API.

If there was a small reproducible example using bullmq I could play a bit to possibly find a solution.

Hi @trentm , I wrote a sample code. Could you please take another look at it? Thank you very much.

@trentm
Copy link
Member

trentm commented Jun 4, 2024

Hi @lzj960515,

I believe you are hitting an issue that is a limitation in the apm.startTransaction() and apm.startSpan() APIs of the elastic-apm-node package. The issue is described for startSpan() at #2611 -- though this is probably much more detail that you care to dig into.

workaround

A workaround described there is to add await Promise.resolve(); before the call to apm.startTransaction().
I've modified your "bullmq.js":

/* eslint-disable @typescript-eslint/no-var-requires */
const apm = require('elastic-apm-node/start');
const shimmer = require('elastic-apm-node/lib/instrumentation/shimmer');
const { Worker } = require('bullmq');

shimmer.wrap(Worker.prototype, 'callProcessJob', function (original) {
  return async function callProcessJob(job, token) {
    await Promise.resolve();           // <------------ add this workaround
    const transName = `${job.queueName}[${job.name}]`;
    const trans = apm.startTransaction(transName, 'queue');
    ...

and that seems to work for me now.

details

The issue with the apm.startTransaction() and apm.startSpan() APIs is that they must change the current running async context to work. So when your wrapper calls apm.startTransaction() the current async context includes the test[transcode] transaction. Then when async function callProcessJob() hits await original.call(this, job, token);, Node.js's Promise handling returns execution to the caller... and the test[transcode] transaction remains current. That transaction then gets used for subsequent execution in the caller, including bullmq's internal asyncFifoQueue handling that is calling redis BZPOPMIN etc.

What the await Promise.resolve() workaround does is trigger that "Node.js's Promise handling returns execution to the caller" before the apm.startTransaction() call, and things work a little more as expected.

In a perfect world there would be a better API from the APM agent -- something like apm.withTransaction(name, (trans) => { /* function in which the new transaction is current */ }). An issue for that feature is here: #2983

However, at this point looking at using OpenTelemetry (and perhaps our OpenTelemetry distribution for Node.js -- https://github.com/elastic/elastic-otel-node/tree/main/packages/opentelemetry-node -- currently in alpha) will be a better long term bet as we are focusing more of our efforts on good compatibility with OpenTelemetry. The OTel API includes an equivalent of this better withSpan or withTransaction API idea -- in OTel it is called .startActiveSpan(. A small example using it is here: https://github.com/open-telemetry/opentelemetry-js-contrib/blob/7d6ddea613d6702931d8abb468eb3500fbd06f33/examples/bunyan/app.js#L29

Note that switching to OpenTelemetry, especially when doing custom/manual instrumentation as you are, does mean relearning a number of concepts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants