Skip to content

Commit

Permalink
Merge pull request #237 from tulios/chore/eos-documentation
Browse files Browse the repository at this point in the history
Document EoS work in README
  • Loading branch information
tulios authored Jan 7, 2019
2 parents 9cda31b + fa026cf commit 40eab08
Showing 1 changed file with 72 additions and 2 deletions.
74 changes: 72 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ KafkaJS is battle-tested and ready for production.
- [Options](#producing-messages-options)
- [Custom partitioner](#producing-messages-custom-partitioner)
- [Retry](#producing-messages-retry)
- [Transactions](#producer-transactions)
- [Sending Messages](#producer-transaction-messages)
- [Sending Offsets](#producer-transaction-offsets)
- [Compression](#producing-messages-compression)
- [GZIP](#producing-messages-compression-gzip)
- [Snappy](#producing-messages-compression-snappy)
Expand Down Expand Up @@ -173,6 +176,7 @@ __Available options:__
| factor | Randomization factor | `0.2` |
| multiplier | Exponential factor | `2` |
| retries | Max number of retries per call | `5` |
| maxInFlightRequests | Max number of requests that may be in progress at any time. If falsey then no limit. | `null` _(no limit)_

Example:

Expand Down Expand Up @@ -275,7 +279,6 @@ await producer.send({
| timeout | The time to await a response in ms | `30000` |
| compression | Compression codec | `CompressionTypes.None` |
| transactionTimeout | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the __broker__, the request will fail with a `InvalidTransactionTimeout` error | `60000` |
| ----------- |-------------------------------------------------------------------------------------------------- | ------- |
| idempotent | _Experimental._ If enabled producer will ensure each message is written exactly once. Acks _must_ be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | `false` |

By default, the producer is configured to distribute the messages with the following logic:
Expand Down Expand Up @@ -401,6 +404,68 @@ The option `retry` can be used to customize the configuration for the producer.

Take a look at [Retry](#configuration-default-retry) for more information.

### <a name="producer-transactions"></a> Transactions

KafkaJS provides a a simple interface to support Kafka transactions (requires Kafka >= v0.11).

#### <a name="producer-transaction-messages"></a> Sending Messages within a Transaction

You initialize a transaction by making an async call to `producer.transaction()`. The returned transaction object has the methods `send` and `sendBatch` with an identical signature to the producer. When you are done you call `transaction.commit()` or `transaction.abort()` to end the transaction. A transactionally aware consumer will only read messages which were committed.

_Note_: Kafka requires that the transactional producer have the following configuration to _guarantee_ EoS ("Exactly-once-semantics"):

- The producer must have a max in flight requests of 1
- The producer must wait for acknowledgement from all replices (acks=-1)
- The producer must have unlimitted retries

```javascript
const client = new Kafka({
clientId: 'transactional-client',
brokers: ['kafka1:9092'],
// Cannot guarantee EoS without max in flight requests of 1
maxInFlightRequests: 1,
})
// Setting `idempotent` to `true` will correctly configure the producer
// to use unlimitted retries and enforce acks from all replices
const producer = client.producer({ idempotent: true })

// Begin a transaction
const transaction = await producer.transaction()

try {
// Call one of the transaction's send methods
await transaction.send({ topic, messages })

// Commit the transaction
await transaction.commit()
} catch (e) {
// Abort the transaction in event of failure
await transaction.abort()
}
```

#### <a name="producer-transaction-offsets"></a> Sending Offsets

To send offsets as part of a transaction, meaning they will be committed only if the transaction succeeds, use the `transaction.sendOffsets()` method. This is necessary whenever we want a transaction to produce messages derived from a consumer, in a "consume-transform-produce" loop.

```javascript
await transaction.sendOffsets({
consumerGroupId, topics
})
```

`topics` has the following structure:

```
[{
topic: <String>,
partitions: [{
partition: <Number>,
offset: <String>
}]
}]
```

### <a name="producing-messages-compression"></a> Compression

Since KafkaJS aims to have as small footprint and as little dependencies as possible, only GZIP codec is part of the core functionality. Providing plugins supporting other codecs might be considered in the future.
Expand Down Expand Up @@ -593,7 +658,9 @@ await consumer.run({
> `resolveOffset()` is used to mark a message in the batch as processed. In case of errors, the consumer will automatically commit the resolved offsets.
> `commitOffsetsIfNecessary` is used to commit offsets based on the autoCommit configurations (`autoCommitInterval` and `autoCommitThreshold`). Note that auto commit won't happen in `eachBatch` if `commitOffsetsIfNecessary` is not invoked. Take a look at [autoCommit](#consuming-messages-auto-commit) for more information.
> `commitOffsetsIfNecessary(offsets?)` is used to commit offsets based on the autoCommit configurations (`autoCommitInterval` and `autoCommitThreshold`). Note that auto commit won't happen in `eachBatch` if `commitOffsetsIfNecessary` is not invoked. Take a look at [autoCommit](#consuming-messages-auto-commit) for more information.
> `uncommittedOffsets()` returns all offsets by topic-partition which have not yet been committed.
Example:

Expand Down Expand Up @@ -639,6 +706,8 @@ consumer.run({

Having both flavors at the same time is also possible, the consumer will commit the offsets if any of the use cases (interval or number of messages) happens.

`autoCommit`: Advanced option to disable auto committing altogether. If auto committing is disabled you must manually commit message offsets, either by using the `commitOffsetsIfNecessary` method available in the `eachBatch` callback, or by [sending message offsets in a transaction](#producer-transaction-offsets). The `commitOffsetsIfNecessary` method will still respect the other autoCommit options if set. Default: `true`

### <a name="consuming-messages-from-beginning"></a> fromBeginning

The consumer group will use the latest committed offset when fetching messages. If the offset is invalid or not defined, `fromBeginning` defines the behavior of the consumer group.
Expand Down Expand Up @@ -675,6 +744,7 @@ kafka.consumer({
| maxBytes | Maximum amount of bytes to accumulate in the response. Supported by Kafka >= `0.10.1.0` | `10485760` (10MB) |
| maxWaitTimeInMs | The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by `minBytes` | `5000` |
| retry | See [retry](#configuration-default-retry) for more information | `{ retries: 10 }` |
| readUncommitted | Configures the consumer isolation level. If `false` (default), the consumer will not return any transactional messages which were not committed. | `false` |

### <a name="consuming-messages-pause-resume"></a> Pause & Resume

Expand Down

0 comments on commit 40eab08

Please sign in to comment.