Skip to content

Commit

Permalink
ADR-42: JetStream Consumer Priority Groups
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Jul 15, 2024
1 parent 39bbc09 commit c504b6b
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/validate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v1
with:
go-version: 1.16
go-version: "1.22"

- name: Valid metadata and readme updated
shell: bash --noprofile --norc -x -eo pipefail {0}
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc
|[ADR-34](adr/ADR-34.md)|jetstream, client, server|JetStream Consumers Multiple Filters|
|[ADR-36](adr/ADR-36.md)|jetstream, client, server|Subject Mapping Transforms in Streams|
|[ADR-37](adr/ADR-37.md)|jetstream, client, spec|JetStream Simplification|
|[ADR-42](adr/ADR-42.md)|jetstream, server|Pull Consumer Priority Groups|

## Kv

Expand Down Expand Up @@ -113,6 +114,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc
|[ADR-39](adr/ADR-39.md)|server, security|Certificate Store|
|[ADR-40](adr/ADR-40.md)|client, server, spec|NATS Connection|
|[ADR-41](adr/ADR-41.md)|observability, server|NATS Message Path Tracing|
|[ADR-42](adr/ADR-42.md)|jetstream, server|Pull Consumer Priority Groups|

## Spec

Expand Down
166 changes: 166 additions & 0 deletions adr/ADR-42.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Pull Consumer Priority Groups

| Metadata | Value |
|----------|-------------------|
| Date | 2024-05-14 |
| Author | @ripienaar |
| Status | Approved |
| Tags | jetstream, server |


| Revision | Date | Author | Info |
|----------|------------|------------|----------------|
| 1 | 2024-05-14 | @ripienaar | Initial design |

## Context and Problem Statement

We have a class of feature requests that all come down to adding behaviours on a consumer that affects delivery to groups
of clients who interact with the consumer.

Some examples:

* A single client should receive all messages from a consumer and if it fails another should take over
* Groups of related clients should be organised such that certain clients only receive messages when various conditions are met
* Groups of clients should access a Consumer but some should have higher priority than others for receiving messages
* A consumer should deliver messages to clients in a specific grouping such that related messages all go to the same client group

The proposed feature here address some of these needs while paving the way for future needs to be addressed within this
framework and under this umbrella feature called `Pull Consumer Groups`.

The current focus is around providing building blocks that can be used to solve higher order problems client-side.

Related prior work:

* Proposed [Consumer Groups](https://github.com/nats-io/nats-architecture-and-design/pull/36)
* Proposed [Partitioned consumer groups and exclusive consumer](https://github.com/nats-io/nats-architecture-and-design/pull/263)
* Proposed [Consumer Owner ID](https://github.com/nats-io/nats-server/pull/5157)

## General Overview

We introduce 2 settings on `ConsumerConfig` that activates these related features:

```go
{
PriorityGroups: ["<group>", ...],
PriorityPolicy: "<policy>"
}
```

The presence of the `PriorityGroups` setting activates the set of features documented here with a list of names supported
behaviours configured using the `PriorityPolicy`.

Technically in some of the proposed policies the `PriorityGroups` have no real meaning today, but we keep it for consistency
and allow for future features to be added that would be per-group without then requiring all clients to be updated.
Future message grouping features would require groups to be listed here.

In the initial implementation we should limit `PriorityGroups` to one per consumer only and error should one be made with
multiple groups. In future iterations multiple groups will be supported along with dynamic partitioning of stream
data.

This is only supported on Pull Consumers, configuring this on a Push consumer must raise an error.

## Priority Policies

### `overflow` policy

Users want certain clients to only get messages when certain criteria are met.

Imagine jobs are best processed locally in `us-east-1` but at times there might be so many jobs in that region that
`us-west-1` region handling some overflow, while less optimal in terms of transit costs and latency, would be desirable
to ensure serving client needs as soon as possible.

The `overflow` policy enables Pull requests to be served only if criteria like `num_pending` and `num_ack_pending` are
above a certain limit, otherwise those Pull requests will sit idle in the same way that they would if no messages were
available (receiving heartbeats etc).

```go
{
PriorityGroups: ["jobs"],
PriorityPolicy: "overflow",
AckPolicy: "explicit",
// ... other consumer options
}
```

Here we state that the Consumer has one group called `jobs`, it is operating on the `overflow` policy and requires `explicit`
Acks, any other Ack policy will produce an error. If we force this ack policy in normal use we should error in Pedantic mode.

Pull requests will have the following additional fields:

* `"group": "jobs"` - the group the pull belongs to, pulls not part of a valid group will result in an error
* `"min_pending": 1000` - only deliver messages when `num_pending` for the consumer is >= 1000
* `"min_ack_pending: 1000` - only deliver messages when `ack_pending` for the consumer is >= 1000

If `min_pending` and `min_ack_pending` are both given either being satisfied will result in delivery (boolean OR).

In the specific case where MaxAckPending is 1 and a pull is made using `min_pending: 1` this should only be served when
there are no other pulls waiting. This means we have to give priority to pulls without conditions over those with when
considering the next pull that will receive a message.

### `pinned_client` policy

Users want to have a single client perform all the work for a consumer, but they also want to have a stand-by client that
can take over when the primary, aka `pinned` client, fails.

**NOTE: We should not describe this in terms of exclusivity as there is no such guarantee, there will be times when one
client think it is pinned when it is not because the server switched.**

The `pinned_client` policy provides server-side orchestration for the selection of the pinned client.

```go
{
PriorityGroups: ["jobs"],
PriorityPolicy: "pinned_client",
PriorityTimeout: 120*time.Second,
AckPolicy: "explicit",
// ... other consumer options
}
```

This configuration states:

* We have 1 group defined and all pulls have to belong to this group
* The policy is `pinned_client` that activates these behaviors
* When a pinned client has not done any pulls in the last 120 seconds the server will switch to another client
* AckPolicy has to be `explicit`. If we force this ack policy in normal use we should error in Pedantic mode

A pull request will have the following additional fields:

* `"group": "jobs"` - the group the pull belongs to, pulls not part of a valid group will result in an error
* `"id": "xyz"` - the pinned client will have this ID set to the one the server last supplied (see below), otherwise
this field is absent

After selecting a new pinned client, the first message that will be delivered to this client, and all future ones, will
include a Nats-Pin-Id: xyz header. The client that gets this message should at that point ensure that all future pull
requests have the same ID set.

When a new pinned client needs to be picked - after timeout, admin action, first delivery etc, this process is followed:

1. Stop delivering messages for this group, wait for all in-flight messages to be completed, continue to serve heartbeats
2. Pick the new pinned client
3. Store the new pinned `nuid`
4. Deliver the message to the new pinned client with the ID set
5. Create an advisory that a new pinned client was picked
6. Respond with a 4xx header to any pulls, including waiting ones, that have a different ID set. Client that received this error will clear the ID and pull with no ID

If no pulls from the pinned client is received within `PriorityTimeout` the server will switch again using the same flow as above.

Future iterations of this feature would introduce the concept of a priority field so clients can self-organise but we decided
to deliver that in a future iteration.

Clients can expose call-back notifications when they become pinned (first message with `Nats-Pin-Id` header is received) and
when they lose the pin (they receive the 4xx error when doing a pull with a old ID).

A new API, `$JS.API.CONSUMER.UNPIN.<STREAM>.<CONSUMER>.<GROUP>`, can be called which will clear the ID and trigger a client switch as above.

Consumer state to include a new field `PriorityGroups` of type `[]PriorityGroupState`:

```go
type PriorityGroupState struct {
Group string `json:"name"`
PinnedClientId string `json:"pinned_id,omitempty"`
PinnedTs *time.Time `json:"pinned_ts,omitempty"`
}
```

Future iterations will include delivery stats per group.
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
module github.com/nats-io/nats-architecture-and-design

go 1.16
go 1.22

require gitlab.com/golang-commonmark/markdown v0.0.0-20191127184510-91b5b3c99c19
require gitlab.com/golang-commonmark/markdown v0.0.0-20211110145824-bf3e522c626a

require (
gitlab.com/golang-commonmark/html v0.0.0-20191124015941-a22733972181 // indirect
gitlab.com/golang-commonmark/linkify v0.0.0-20200225224916-64bca66f6ad3 // indirect
gitlab.com/golang-commonmark/mdurl v0.0.0-20191124015652-932350d1cb84 // indirect
gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f // indirect
golang.org/x/text v0.15.0 // indirect
)
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
github.com/russross/blackfriday v2.0.0+incompatible h1:cBXrhZNUf9C+La9/YpS+UHpUT8YD6Td9ZMSU9APFcsk=
github.com/russross/blackfriday v2.0.0+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
gitlab.com/golang-commonmark/html v0.0.0-20191124015941-a22733972181 h1:K+bMSIx9A7mLES1rtG+qKduLIXq40DAzYHtb0XuCukA=
gitlab.com/golang-commonmark/html v0.0.0-20191124015941-a22733972181/go.mod h1:dzYhVIwWCtzPAa4QP98wfB9+mzt33MSmM8wsKiMi2ow=
gitlab.com/golang-commonmark/linkify v0.0.0-20191026162114-a0c2df6c8f82 h1:oYrL81N608MLZhma3ruL8qTM4xcpYECGut8KSxRY59g=
gitlab.com/golang-commonmark/linkify v0.0.0-20191026162114-a0c2df6c8f82/go.mod h1:Gn+LZmCrhPECMD3SOKlE+BOHwhOYD9j7WT9NUtkCrC8=
gitlab.com/golang-commonmark/markdown v0.0.0-20191127184510-91b5b3c99c19 h1:HsZm6XaTpEgZiZqcXZkUbG6BNtSZE3XyCTfo52YBoDY=
gitlab.com/golang-commonmark/markdown v0.0.0-20191127184510-91b5b3c99c19/go.mod h1:CRIzp0wh6PvKEAeEOtp9wEpNKJJ1VFTNfHO4+ToRgVA=
gitlab.com/golang-commonmark/linkify v0.0.0-20200225224916-64bca66f6ad3 h1:1Coh5BsUBlXoEJmIEaNzVAWrtg9k7/eJzailMQr1grw=
gitlab.com/golang-commonmark/linkify v0.0.0-20200225224916-64bca66f6ad3/go.mod h1:Gn+LZmCrhPECMD3SOKlE+BOHwhOYD9j7WT9NUtkCrC8=
gitlab.com/golang-commonmark/markdown v0.0.0-20211110145824-bf3e522c626a h1:O85GKETcmnCNAfv4Aym9tepU8OE0NmcZNqPlXcsBKBs=
gitlab.com/golang-commonmark/markdown v0.0.0-20211110145824-bf3e522c626a/go.mod h1:LaSIs30YPGs1H5jwGgPhLzc8vkNc/k0rDX/fEZqiU/M=
gitlab.com/golang-commonmark/mdurl v0.0.0-20191124015652-932350d1cb84 h1:qqjvoVXdWIcZCLPMlzgA7P9FZWdPGPvP/l3ef8GzV6o=
gitlab.com/golang-commonmark/mdurl v0.0.0-20191124015652-932350d1cb84/go.mod h1:IJZ+fdMvbW2qW6htJx7sLJ04FEs4Ldl/MDsJtMKywfw=
gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f h1:Wku8eEdeJqIOFHtrfkYUByc4bCaTeA6fL0UJgfEiFMI=
gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f/go.mod h1:Tiuhl+njh/JIg0uS/sOJVYi0x2HEa5rc1OAaVsb5tAs=
gitlab.com/opennota/wd v0.0.0-20180912061657-c5d65f63c638 h1:uPZaMiz6Sz0PZs3IZJWpU5qHKGNy///1pacZC9txiUI=
gitlab.com/opennota/wd v0.0.0-20180912061657-c5d65f63c638/go.mod h1:EGRJaqe2eO9XGmFtQCvV3Lm9NLico3UhFwUpCG/+mVU=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

0 comments on commit c504b6b

Please sign in to comment.