Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Group / Sequential Convoy pattern #2043

Open
stepin opened this issue Mar 27, 2021 · 35 comments
Open

Message Group / Sequential Convoy pattern #2043

stepin opened this issue Mar 27, 2021 · 35 comments
Assignees

Comments

@stepin
Copy link

stepin commented Mar 27, 2021

I need to process messages in parallel but in some cases, they should be processed in sequence.
A good simple example is the order case: messages for different order ids should be processed in parallel
but for the same should be processed in sequence. This pattern is described in more details at Microsoft
https://docs.microsoft.com/en-us/azure/architecture/patterns/sequential-convoy
and at ActiveMQ http://activemq.apache.org/message-groups.html . It's a common use case for Event Sourcing scenarios.

Requirements:

  • at least once delivery (and exactly one within a window is even better)
  • messages have a sequence key: if it's null no limitations on parallelization, if it's not null its name of a sequence
  • single workers' group (in worst case 2 groups for null and non-null sequence keys)
  • timeouts for processing
  • up to 5 attempts to process (in error case it's better to send failed messages into the dedicated topic)

I see several options:

  1. NATS JetStream Workers
  • NATS JetStream Workers for jobs that can run in parallel
  • messages with ID header for exactly once detection
  • all preprocessing in a custom code with external storage for sequences

In this case, it's unclear how my code will receive notification that
some message was processed (to check if there are more messages to
process in the same sequence).

  1. NATS Core Services
  • NATS Core Service for jobs that can run in parallel
  • all preprocessing in a custom code with external storage for all messages in progress

It's clear how to implement this option but too many queue-related code outside of the queue.

Is this possible to implement the original pattern with storage inside JetStream? Or maybe even without custom code
(maybe I missed or misunderstood some options like MaxConsumers)? Is this pattern planned for implementation in the future?

@ripienaar
Copy link
Contributor

We've discussed this one a bunch, I think we will need a solution to this but do not have an elegant one today - almost there but we have no way to rebalance consumers.

One for the back burner for future releases I think

@ripienaar
Copy link
Contributor

See nats-io/jetstream#103

@stepin
Copy link
Author

stepin commented Mar 28, 2021

Ok, I will close this as duplicate.

@stepin stepin closed this as completed Mar 28, 2021
@cortopy
Copy link

cortopy commented Aug 7, 2021

could this be reopened?

nats-io/jetstream#103 and the repo itself were archived because jetstream is now in GA and part of the nats server, which leads back to here again?

@derekcollison derekcollison reopened this Aug 7, 2021
@derekcollison
Copy link
Member

We have been discussing this one quite a bit. No resolutions yet but it's definitely on our radar and part of active discussions.

@timsolov
Copy link

Hi, I have an idea. What will say community?

We can use distributed atomic lock system (for example ZooKeeper or ETCD).

  1. Consumer pulls 20 messages.
  2. Consumer iterates over messages' payloads.
  3. Consumer looks to some special key inside each payload (e.g. order_id) and tries to set lock in zookeeper say in /orders/{order_id}.
  4. If lock already exists then we skip this message by sending msg.Nak() (not acknowledged) because that means a message with same key already handling by another consumer.
  5. If lock doesn't exist then we'll take it and go to handle the message.

This logic can be implemented also inside the nats-server. But I don't know how this affects on performance.

@ripienaar
Copy link
Contributor

ripienaar commented Aug 24, 2021

JetStream already does this. When you pull 20 messages you have a lock on that message for AckWait time.

Is it that you want to arrange it so that one consumer always handle all messages related to a given order ID? If that’s what you are asking then we are looking at adding something to the server itself. Currently discussed here nats-io/nats-architecture-and-design#36

@timsolov
Copy link

timsolov commented Aug 24, 2021

@ripienaar not really, when we have GroupSubscription and 3 parallel consumers we can receive different events simultaneously which in microservice arch should be handled one by one.
Example:

  1. Customer creates invitation and removes this invitation straight off.
  2. First consumer starts handle creation of invitation but doesn't have time complete transaction when another event (remove invitation) comes to Second consumer. And the Second consumer tries to remove non-existing invitation.

So what the nats-server offers to handle this situation?

There are two ways as I see to resolve this:

  1. Like a Kafka: send messages by Key from header to different partitions.
  2. Something what I've suggested before in Message Group / Sequential Convoy pattern #2043 (comment)

@ripienaar
Copy link
Contributor

Yes, this is the feature we are working on fleshing out in the link given nats-io/nats-architecture-and-design#36

@rbkrabbe
Copy link

Any news on the feature design? I am also looking for this capability.

@derekcollison
Copy link
Member

We have some updates in 2.8 for folks that are more aligned to the way other tech does partitioning. @jnmoyne might be able to give some insight.

We are still working on a the solution though described above.

@jnmoyne
Copy link
Contributor

jnmoyne commented Apr 24, 2022

Yes, the new feature introduced in 2.8 is called deterministic subject token partitioning as an extension to the existing subject mapping functionality (which can be applied at the account level and during import/exports).

Deterministic token partitioning allows you to use subject based addressing to deterministically divide (partition) a flow of messages where one or more of the subject tokens make up the key upon which the partitioning will be based, into a number of smaller message flows.

For example: new customer orders are published on neworders.<customer id>, you can partition those messages over 3 partition numbers (buckets), using the partition(number of partitions, wildcard token positions...) function which returns a partition number (between 0 and number of partitions-1) by using the following mapping "neworders.*" : "neworders.{{wildcard(1)}}.{{partition(3,1)}}".

This particular mapping means that any message published on neworders.<customer id> will be mapped to subject.<customer id>.<a partition number 0, 1, or 2>. i.e.:

Published on Mapped to
neworders.customerid1 neworders.customerid1.0
neworders.customerid2 neworders.customerid1.2
neworders.customerid3 neworders.customerid3.1
neworders.customerid4 neworders.customerid4.2
neworders.customerid5 neworders.customerid5.1
neworders.customerid6 neworders.customerid6.0

The mapping is deterministic because (as long as the number of partitions is 3) 'customerid1' will always map to the same partition number. The mapping is hash based, it's distribution is random but tending towards 'perfectly balanced' distribution (i.e. the more keys you map the more the number of keys for each partition will tend to converge to the same number).

You can partition on more than one subject wildcard token at a time, e.g.: {{partition(10,1,2)}} distributes the union of token wildcards 1 and 2 over 10 partitions.

Published on Mapped to
foo.1.a foo.1.a.1
foo.1.b foo.1.b.0
foo.2.b foo.2.b.9
foo.2.a foo.2.a.2

What this deterministic partition mapping enables is the distribution of the messages that are subscribed to using a single subscriber (on neworders.*) into three separate subscribers (respectively on neworders.*.0, neworders.*.1 and neworders.*.2) that can operate in parallel.

@cortopy
Copy link

cortopy commented Apr 25, 2022

@derekcollison @jnmoyne this is extremely good and probably one of the biggest features that people migrating from kafka may be looking for. Thanks so much for this

However, I just wanted to say that I'm a bit puzzled by not being able to find anything about this in the docs at nats.io. It would be a shame if this feature went unnoticed

@jnmoyne
Copy link
Contributor

jnmoyne commented Apr 25, 2022

Thanks. I also believe that this was one of the few things some people coming from Kafka to NATS have been wanting that we didn't have until now.

The feature is absolutely brand new (2.8.0 was literally released last week) and the doc update will happen very soon (what I wrote here is actually a preview of what the doc will say about that feature (comments welcomed:))

@derekcollison
Copy link
Member

@cortopy that was all @jnmoyne ! We will get docs updated as soon as we can.

@jnmoyne
Copy link
Contributor

jnmoyne commented Apr 25, 2022

Yeah I have a large doc PR in the works

@jnmoyne
Copy link
Contributor

jnmoyne commented May 6, 2022

FYI docs have been updated now. I think it may be ok to close this issue now?

@rbkrabbe
Copy link

rbkrabbe commented May 7, 2022

Thank you for the updates. Sorry for disappearing after asking a question 😅

While the partitioning is nice, and the docs look good, Ideally, I would want to be able to horizontally scale the consumers for a stream and still get in-order delivery by some key. With partitioning, I would be restricted to run a single consumer per partition to get the ordering I need.

In the ADR linked above, I also see this:

we might require a header to be set for example NATS-Consumer-ID: 1 for sensor 1

Which I will be tricky. It requires the publisher to be aware of the topology of the consumers, but that topology should ideally be dynamic.

I'm wondering if something like message de-duplication could be repurposed for this? If the publisher adds something like a NATS-Convoy-ID: 1 header (note that the ID is for the sensor, not the consumer), and a message is considered duplicate if there are in-flight messages with the same convoy id.

At least for my use-case, it would be acceptable to break the ordering guarantee if a message has not been ACK'ed, NACK'ed or marked In-progress for a few minutes.

@timsolov
Copy link

timsolov commented May 7, 2022

@jnmoyne can you provide the link to documentation? I think it would be good to have direct link from this issue. Thank you.

@jnmoyne
Copy link
Contributor

jnmoyne commented May 18, 2022

@jnmoyne
Copy link
Contributor

jnmoyne commented May 18, 2022

@rbkrabbe Partitioning does indeed allow you to scale processing while still getting in-order delivery per key, you just need to use more partition to scale further.

You can have guaranteed strict in-order delivery per partition (even if you happen to have more than one subscriber per partition consumer) simply by setting "Max acks pending" to 1 on the consumer for the partition, no need to use any headers for that (the key the partitioning happens on is one (or more) token(s) in the subject name) or to try to re-use message de-duplication.

@z0mb1ek
Copy link

z0mb1ek commented Jun 4, 2022

Hi, thank for this feature, but I have a question, in kafka we have migration for partition if our instance of counsumer gone down, but in this case are there any solutions to this problem?

@rbkrabbe
Copy link

rbkrabbe commented Jun 8, 2022

@jnmoyne That would cause head-of-line blocking on the partition right? It would effectively mean that only 1 consumer in the group would be active at a time while the others idled.

How easy is it to change the deterministic partitioning to add more partitions and is there a limit to how many partitions it is practical to have?

I have 2 use-cases in mind for this:

  1. Replicate blobs between buckets.
    In this case I would ideally like convoy-per-blob, but could consider partition per bucket with "Max acks pending = 1", or deterministic partitioning by blob name. My concern with partition per bucket is that I would have hot/cold buckets, so some consumers would be busy and might suffer delays while others would be idling. Deterministic partitioning per blob name would probably work OK because the likelihood of the consumer failing would be quite low. If the consumer fails, the entire partition would be blocked as I understand it, which would mean that a failure on 1 bucket would leak onto other buckets.
  2. Event stream per object
    I have a set of objects, much like the "order_id"/"customerid" examples above where I need a convoy per id. In this case there is a higher risk that a consumer might fail, and I can't tolerate a failure on processing for 1 customer blocking processing of another customer. This seems to rule out deterministic partitioning, or am I missing some trick that would allow me to work around the head-of-line blocking?

@jnmoyne
Copy link
Contributor

jnmoyne commented Jun 8, 2022

Note on the terminology in use when talking about JS: 'streams' are what record messages, and you create 'consumers' on streams, those consumers are a bit like a DataBase 'view' in the sense that they let applications receive all or a subset of the messages in a stream, consumers have some internal state (which is kept and maintained by the JS servers) which consists of sequence number counters and tables which are updated as the messages are sent by the consumer and acknowledged by the subscribers. The actual applications that are going to process the messages 'subscribe' to a consumer. So those applications although they can be seen as 'consuming messages' are not consumers, but subscribers to consumers. There are no 'consumer groups' like you have on Kafka and no 'rebalancing', rather you have subscribers to consumers and typically you would rely on your container orchestration system of choice to start/stop/monitor and restart your applications such that you have whatever number of subscriber per consumer (partition) you want, each subscriber being passed the consumer name (i.e. partition number) that it should subscribe to as part of it's runtime information (e.g. command line argument or environment variable, which is set by the container orchestration system you are using).

If you want a guaranteed strictly ordered processing then you indeed need to set the 'max acks pending' value for the partition's consumer to 1, meaning that only one of the subscribers to that consumer gets a message to process at a time. If you are using partitioning for some other reason than strictly ordered processing, then you can increase the number of max acks pending to more than 1 and have more than one subscriber to the consumer receiving a message at the same time.

Even with max acks pending set to 1, you can have more than one subscriber at a time on the consumer (but only one of them would get a message to process at a time) which means that if one of those subscribers were to die, processing would still continue as long as you have at least one subscriber to that consumer.

The partition mapping (e.g. the number of partitions) can be changed at any time, to increase the number of partitions you would: first create streams/consumers for the new partitions you are going to add, then change the mapping to increase the number of partitions, then start workers to subscribe to those new partitions (if you don't need strict ordering, you can even start them before changing the mapping). To decrease the number of partitions you would: first change the mapping to decrease the number of partitions, then monitor the consumers for the partitions you removed and once those partitions have delivered all the messages they may still have buffered to their subscribers you can stop them and remove the streams/consumers for those partitions you removed.

@z0mb1ek
Copy link

z0mb1ek commented Jun 9, 2022

@jnmoyne I understand your point of view, but if I have 3 instance I want to balance my work and have 1 consumer per 1 instance of my service, but in your case I can get 3 working consumers on 1 instance, and 2 instance will rest

@jnmoyne
Copy link
Contributor

jnmoyne commented Jun 9, 2022

You control (for example using your container orchestration system of choice) how many instances of your worker you want to have per partition. So you can have 3 partitions and one subscriber/worker per partition and they will all be doing work.

There is a disconnect between the term "consumer" as it is used in Kafka (coming from the "consumer group" term) and the meaning of that term in JetStream. To simplify: a "consumer" in JetStream is like a "partition" in Kafka and a "subscriber" in JetStream is like a "consumer" in Kafka.

@z0mb1ek
Copy link

z0mb1ek commented Jun 9, 2022

If I have one subscriber/worker per partition and my instance go down, and there is no more free resources in my cluster, work will stop. Moreover for example in k8s we need StatefulSet for instance numbering and when the instance crashes - k8s will not start another. In kafka this case will work

@jnmoyne
Copy link
Contributor

jnmoyne commented Jun 9, 2022

There is no concept of "consumer groups" in JetStream where that functionality happens at the administrative/ops level and can be easily implemented using container orchestration. And I would argue that it is the right model rather than having something in the streaming service itself because for example your container orchestration system can start or restart new instances (containers) of your workers automatically for if they die while the consumer group feature of Kafka will never be able to start new worker containers automatically.

@z0mb1ek
Copy link

z0mb1ek commented Jun 9, 2022

That's true, but in real life there are many different situations (some about resources i said) and changing consumers on the fly is good feature. We will try to make this like a framework with nats cluster api, thanks for your support

@bruth
Copy link
Member

bruth commented Jun 21, 2022

Just wanted to chime in and say you can still have multiple subscribers (workers) deployed for a given consumer (for a given partition) and maintain ordering. You need to set MaxAckPending to 1 which means there will only be a single message in flight across all workers for that partition. The benefit is that if one of the workers goes offline or you need to do a rolling deployment, you will always have one worker up.. so its like HA for your workers per partition while still maintaining order.

@mnavarrocarter
Copy link

The partition mapping (e.g. the number of partitions) can be changed at any time, to increase the number of partitions you would: first create streams/consumers for the new partitions you are going to add, then change the mapping to increase the number of partitions, then start workers to subscribe to those new partitions (if you don't need strict ordering, you can even start them before changing the mapping). To decrease the number of partitions you would: first change the mapping to decrease the number of partitions, then monitor the consumers for the partitions you removed and once those partitions have delivered all the messages they may still have buffered to their subscribers you can stop them and remove the streams/consumers for those partitions you removed.

Hey @jnmoyne I'm trying to implement this process you described above for an auto-scaling partition proof-of-concept I'm working on. But, as far as I know this all needs to be done via CLI right? I can't seem to find any methods on the go client library for modifying the mappings in a NATS cluster. I do see that the CLI uses the server package. Can I do this myself if I want to program something or do you recommend sticking to the CLI? Or alternatively, is there a chance mappings can be implemented in the client library?

Cheers!

@jnmoyne
Copy link
Contributor

jnmoyne commented Feb 24, 2024

So in this case here you are trying to scale the consumption of the messages from a stream and not trying to scale the storing of messages into a stream, so this means you use one stream and then leverage subject transformation within the stream (a feature that was introduced by version 2.10 of nats-server) to then create multiple consumers on the stream (i.e. one consumer per partition, or set of partitions).

This means that the mapping is part of the stream definition, and you can define it from the client application using the same calls that you would use to create streams normally. This is in comparison with the Core NATS subject transformation feature, which are defined as part of the account configuration (and therefore it depends on what security mode you are using: for example with static security the mappings will be defined in the server configuration file, while with operator mode security they are defined as part of the account JWT).

In any case you can do elasticity without having to 're-partition'

I actually have implemented all this (i.e. elastic 'consumer groups' for JetStream) already and I hope to be able to release it to the public soon, but you can read about how it's done in the ADR PR nats-io/nats-architecture-and-design#263

@mnavarrocarter
Copy link

@jnmoyne this is awesome!!!

I'm more than happy to help with testing / trialling this once this is released.

@aradalvand
Copy link

@jnmoyne Hey, it's been a few months so I wanted to ask if the stuff you described in nats-io/nats-architecture-and-design#263 has made it to NATS yet? I can't find any docs or examples for it.

@jnmoyne
Copy link
Contributor

jnmoyne commented Nov 4, 2024

The pinned consumer functionality has just been merged to the main branch and will be included in the 2.11 release which is needed for built-in fault-tolerance. After that I will release an initial implementation of a client library (in Go, but nothing Go specific to it) that does the partitioned consumer-group functionality described in that PR which now needs to be updated. I apologize and thank you for your patience :).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests