Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Forwarding Jobs -> External Message Broker #461

Open
3 of 6 tasks
jkonowitch opened this issue Apr 25, 2024 · 5 comments
Open
3 of 6 tasks

Batch Forwarding Jobs -> External Message Broker #461

jkonowitch opened this issue Apr 25, 2024 · 5 comments

Comments

@jkonowitch
Copy link

Feature description

In the documentation section Don't just jump to another queue!, you make the following suggestion:

I have plans that I've not had time to implement w.r.t. batch exporting jobs to external queues.

My use case, that I will describe below, entails publishing events to RabbitMQ.

Motivating example

In my case, the motivation is not performance, but software requirements. I am using Postgres as an Event Store, where events are committed atomically with updates to the data model. Currently, I plan to use a trigger to put these events into a Graphile Worker queue, for which I have one task Worker that will publish these events to RabbitMQ.

The reason I am using RabbitMQ, and not just Graphile Worker, is that I have requirements that involve more complex routing (e.g. topic exchanges), as well as a requirement that messages be readable on platforms other that NodeJS.

Graphile Worker will work very well for this as it will ensure events do indeed get published, as deduping, retries, backoffs, etc. are built in. But your suggestion in the documentation was intriguing. I'm sure performance will be just fine - I am not expecting > 5k messages per second. Nonetheless, batch exporting seems like a sensible idea, and I wanted to raise the question and see if I could assist in developing it.

Breaking changes

None, as far as I know. This would be a new feature.

Supporting development

I [tick all that apply]:

  • am interested in building this feature myself
  • am interested in collaborating on building this feature
  • am willing to help testing this feature before it's released
  • am willing to write a test-driven test suite for this feature (before it exists)
  • am a Graphile sponsor ❤️
  • have an active support or consultancy contract with Graphile
@benjie
Copy link
Member

benjie commented Apr 26, 2024

👋 Cool!

So it sounds like you don't really need this, but it's a nice to have for everyone.

My plans with this mode is that it would effectively just do:

begin;
delete *
from graphile_worker.jobs
where id in (
  select id
  from graphile_worker.jobs
  where <conditions>
  order by <order>
  for update skip locked
  limit <batch size>
)
returning *;

-- UPLOAD TO REMOTE QUEUE

commit;

-- TODO: clean up named queues here, if any

There's lots of things to answer though:

  • Are long-lived transactions the best way to do this, or should we use locked_at as we do traditionally to reduce database held transaction costs?
  • Do we want to support this only on one or two task identifiers and have Graphile Worker be able to handle other jobs directly?
  • What values should we use/recommend for batch size
  • Is ordering actually important in this mode, or can we just grab whatever jobs the DB wants to give us (faster) and trust that we're exporting jobs faster than they're being created and thus ordering is irrelevant?
  • Do we need to do anything specially to support multiple workers doing this?
  • Exactly what interfaces do we want to expose to the user?
  • What checks/timeouts should we put in place?
  • What if we just did batch processing of all jobs (like POC - batched queue processing #99), and "export" mode was just a task that supported this with a large batch size? Would a dedicated export mode be significantly faster, or not?
  • Probably a bunch of other questions I've not thought of yet.

I suggest that you put forward a relatively complete proposal (including how any edge cases will be handled: like, what if someone is still running Worker in normal mode whilst Worker in bulk export mode starts up) and then we can discuss the finer points, and if all is aligned then you could go ahead and implement the proposal (if you're still interested at that point!)

What do you think?

@benjie
Copy link
Member

benjie commented Apr 26, 2024

See also: #315

@jkonowitch
Copy link
Author

Thank you for your detailed and thoughtful response! A lot of food for thought here. And yes, this is very much a nice to have, not essential, so I will noodle on your suggestions.

Admittedly, I am not very experienced with more advanced DB techniques - for example, I don't quite understand how the SQL example you sketched out would work. Would the I/O to the message broker occur in the database? Does this statement run in some sort of loop (i.e. how does this procedure get invoked)? And, in what cases does Graphile Worker use "native" database locks, vs. a column like locked_at, and why?

I do think batch processing in general potentially does subsume this feature, or this would simply be a special case, like an "export" mode as you suggest.

As a side/related note: I have been re-examining our need for RabbitMQ in the first place. It turns out the major requirement is to multicast events. I was thinking about implementing this with GW by dynamically creating conditional triggers from application code. So, for example, when a new event is added to the events table, there would be multiple triggers that will react and add jobs to specifically named task identifiers for each module that has indicated it wants to listen to a particular event. Anyway, this is slightly off-topic but I will report back if this approach works well, as I'd ideally like to minimize infrastructure, and this approach could be a useful pattern to others. I haven't used conditional triggers extensively, but I believe that since the graphile_worker.add_job function is pretty simple, and triggers would only run when certain conditions are met, that this would be fairly efficient.

@benjie
Copy link
Member

benjie commented May 15, 2024

Would the I/O to the message broker occur in the database?

No; the code above would be executed from JS, and the -- UPLOAD TO REMOTE QUEUE part would be an await addToQueue(jobs); call or similar.

Does this statement run in some sort of loop (i.e. how does this procedure get invoked)?

Yes. Probably a loop that runs every-so-often (every minute?), plus also listens for the NOTIFY to reduce latency to milliseconds in general.

in what cases does Graphile Worker use "native" database locks, vs. a column like locked_at, and why?

This is a delete * from inside a transaction so it would use native locking; it has for update skip locked on the subquery so it won't conflict with other for update skip locked statements. There's no need for updating the DB and then deleting the records - better to just delete them, and roll back if writing to the other queue fails.

I do think batch processing in general potentially does subsume this feature, or this would simply be a special case, like an "export" mode as you suggest.

Batch processing would use the update rather than delete approach because it would allow for failure of some jobs but not others - therefore we couldn't pro-actively delete the rows. Still, it's performance may well be sufficient for this use case.

@jkonowitch
Copy link
Author

Got it that makes, and in particular your distinction between batch processing where error handling / retrying is necessary and batch exporting to another queue where we can optimistically delete all the events in a transaction.

In my testing both locally and with production workloads, the performance characteristics of graphile worker as-is are more than sufficient, so I will deprioritize this investigation. However, leaving this issue open as the context started here is very helpful if anyone wants to champion this issue going forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants