Skip to content

Commit

Permalink
Merge branch 'master' into parameters-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m authored Jun 17, 2024
2 parents fe7fcaa + a478894 commit 3e78e3a
Show file tree
Hide file tree
Showing 232 changed files with 2,320 additions and 3,594 deletions.
42 changes: 21 additions & 21 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,27 @@ allprojects {

project.ext.versions = [
kafka : '3.6.2',
guava : '23.0',
jackson : '2.15.2',
jersey : '3.1.2',
jetty : '12.0.7',
guava : '33.1.0-jre',
jackson : '2.17.0',
jersey : '3.1.6',
jetty : '12.0.8',
curator : '5.4.0',
dropwizard_metrics: '4.1.0',
micrometer_metrics: '1.11.1',
wiremock : '3.3.1',
spock : '2.4-M1-groovy-4.0',
groovy : '4.0.12',
avro : '1.9.1',
dropwizard_metrics: '4.2.25',
micrometer_metrics: '1.12.5',
wiremock : '3.5.2',
spock : '2.4-M4-groovy-4.0',
groovy : '4.0.21',
avro : '1.11.3',
json2avro : '0.2.14',
// TODO: newest version requires subject alternative name in a certificate during host verification, current test cert does not have a one
okhttp : '3.9.1',
undertow : '2.0.29.Final',
spring_web : '6.1.2',
failsafe : '2.3.1',
junit_jupiter : '5.9.1',
testcontainers : '1.18.1',
spring : '3.2.1',
assertj : '3.24.2'
undertow : '2.3.12.Final',
spring_web : '6.1.6',
failsafe : '2.4.4',
junit_jupiter : '5.10.2',
testcontainers : '1.19.8',
spring : '3.2.4',
assertj : '3.25.3'
]

repositories {
Expand All @@ -77,15 +78,14 @@ allprojects {

dependencies {
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.4'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0'

testImplementation group: 'junit', name: 'junit', version: '4.11'
testImplementation group: 'com.tngtech.java', name: 'junit-dataprovider', version: '1.10.0'
testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.0.2'
testImplementation group: 'org.mockito', name: 'mockito-all', version: '1.9.5'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.11.0'
testImplementation group: 'org.assertj', name: 'assertj-core', version: versions.assertj
testImplementation group: 'com.jayway.awaitility', name: 'awaitility', version: '1.6.1'
testImplementation group: 'com.googlecode.catch-exception', name: 'catch-exception', version: '1.2.0'
testImplementation group: 'org.awaitility', name: 'awaitility', version: '4.2.1'

annotationProcessor group: 'org.springframework.boot', name: 'spring-boot-configuration-processor', version: versions.spring
}
Expand Down
19 changes: 0 additions & 19 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@ services:
- kafka_data:/var/lib/kafka/data
- kafka_secrets:/etc/kafka/secrets

graphite:
image: graphiteapp/graphite-statsd:1.1.3
ports:
- '2003-2004:2003-2004'
- '2023-2024:2023-2024'
- '8125:8125/udp'
- '8126:8126'
- '8082:80'
volumes:
- graphite_conf:/opt/graphite/conf
- graphite_data:/opt/graphite/storage
- statsd_data:/opt/statsd

frontend:
build:
context: ../
Expand All @@ -52,7 +39,6 @@ services:
depends_on:
- zk
- kafka
- graphite

consumers:
build:
Expand All @@ -61,7 +47,6 @@ services:
depends_on:
- zk
- kafka
- graphite

management:
build:
Expand All @@ -72,7 +57,6 @@ services:
depends_on:
- zk
- kafka
- graphite

schema-registry:
image: "confluentinc/cp-schema-registry:${CONFLUENT_IMAGES_TAG}"
Expand All @@ -87,9 +71,6 @@ services:
- "8081:8081"

volumes:
graphite_conf:
graphite_data:
statsd_data:
zk_secrets:
zk_data:
zk_log:
Expand Down
5 changes: 0 additions & 5 deletions docker/latest/consumers/consumers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ consumer:
clusters:
- datacenter: "dc"
brokerList: "kafka:29092"
graphite:
host: "graphite"
metrics:
metric-registry:
graphiteReporterEnabled: true
workload:
consumerPerSubscription: 1
schema:
Expand Down
5 changes: 0 additions & 5 deletions docker/latest/frontend/frontend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ frontend:
clusters:
- datacenter: "dc"
brokerList: "kafka:29092"
graphite:
host: "graphite"
metrics:
metric-registry:
graphiteReporterEnabled: true
schema:
cache:
refreshAfterWrite: 1m
Expand Down
5 changes: 0 additions & 5 deletions docker/latest/management/management.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ kafka:
connectionTimeout: 3000
bootstrapKafkaServer: kafka:29092

graphite:
client:
enabled: true
externalMonitoringUrl: graphite:8082

server:
port: 8090

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/configuration/buffer-persistence.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Publishing buffer persistence
# Publishing buffer persistence [deprecated]

Hermes Frontend API has option to register callbacks triggered during different phases of message lifetime:

Expand All @@ -15,7 +15,7 @@ to disk. Map structure is continuously persisted to disk, as it is stored in off

When Hermes Frontend starts up it scans filesystem in search of existing persisted map. If found, it is read and any
persisted events are sent to Message Store. This way recovering after crash is fully automatic. If Hermes process or
server crashes, nothing is lost.
server crashes, events that were flushed to disk are recovered.

There is additional protection against flooding subscribers with outdated events. When reading events from persisted
storage, Hermes filters out messages older than N hours, where N is a system parameter and is set to 3 days by default.
Expand Down
26 changes: 13 additions & 13 deletions docs/docs/configuration/consumers-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

## HTTP Sender

Option | Description | Default value
---------------------------------------------------- | ----------------------------------------------------------- | -------------
consumer.http-client.serial.http1.threadPoolSize | size of thread pool for sender threads (global) | 30
consumer.http-client.serial.http1.maxConnectionsPerDestination | max connections per remote host | 100
| Option | Description | Default value |
|----------------------------------------------------------------|-------------------------------------------------|---------------|
| consumer.http-client.serial.http1.threadPoolSize | size of thread pool for sender threads (global) | 30 |
| consumer.http-client.serial.http1.maxConnectionsPerDestination | max connections per remote host | 100 |

## Consumers core

Option | Description | Default value
----------------------------- | ------------------------------------------------------------------------ | -------------
consumer.commit.offset.period | interval between committing offsets to Kafka | 60s
consumer.threadPoolSize | thread pool for threads involved in consuming, 1 thread per subscription | 500
consumer.serialConsumer.inflightSize | how many messages can be kept in send queue, per subscription | 100
| Option | Description | Default value |
|--------------------------------------|--------------------------------------------------------------------------|---------------|
| consumer.commit.offset.period | interval between committing offsets to Kafka | 60s |
| consumer.threadPoolSize | thread pool for threads involved in consuming, 1 thread per subscription | 500 |
| consumer.serialConsumer.inflightSize | how many messages can be kept in send queue, per subscription | 100 |

## Workload constraints management

Expand All @@ -26,10 +26,10 @@ subscriptions assigned to itself.

These numbers can be configured:

Option | Description | Default value
--------------------------------------------------- | ----------------------------------------- | ---------------------
consumer.workload.consumersPerSubscription | Number of consumers to which the subscription will be assigned. If this value is greater than the number of available consumers, Hermes will assign the subscription to all available consumers. | 2
consumer.workload.maxSubscriptionsPerConsumer | The maximum number of subscriptions assigned to a single consumer. If all consumers have the maximum number of subscriptions assigned, a new subscription will not be activated until a new consumer is added or another subscription is unassigned. | 200
| Option | Description | Default value |
|-----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| consumer.workload.consumersPerSubscription | Number of consumers to which the subscription will be assigned. If this value is greater than the number of available consumers, Hermes will assign the subscription to all available consumers. | 2 |
| consumer.workload.maxSubscriptionsPerConsumer | The maximum number of subscriptions assigned to a single consumer. If all consumers have the maximum number of subscriptions assigned, a new subscription will not be activated until a new consumer is added or another subscription is unassigned. | 200 |

Additionally, Hermes allows to configure the property `consumer.workload.consumersPerSubscription` for specific
topics or subscriptions in the runtime via REST API.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/overview/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Hermes integrates with multiple systems, each having different role.

* **Message Store** - stores and routes messages, current implementation: Kafka
* **Metadata Store** - shared metadata storage for all Hermes modules, current implementation: Zookeeper
* **Metrics Store** *[optional]* - stores metrics gathered by Hermes, current implementation: Graphite
* **Metrics Store** *[optional]* - stores metrics gathered by Hermes, currently Hermes exposes metrics in Prometheus format
* **Tracking Store** *[optional]* - stores tracking (message trace) information, current implementation: ElasticSearch

## Message flow
Expand Down
3 changes: 1 addition & 2 deletions docs/docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ image: allegro/hermes-management:hermes-[specific version tag]
## Development
The default `docker-compose` setup will start all hermes modules (consumers, frontend, management), together
with its dependencies (Kafka, ZooKeeper, Graphite, Schema Registry). To run a specific module with gradle/IntelliJ,
with its dependencies (Kafka, ZooKeeper, Schema Registry). To run a specific module with gradle/IntelliJ,
just comment out the module in `services` section of the `docker-compose.yml` file, and start the java process locally:

`./gradlew -p hermes-frontend run`
Expand Down Expand Up @@ -175,7 +175,6 @@ management:
depends_on:
- zk
- kafka
- graphite
[...]
```

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/user/java-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ At the moment there are four implementations of `HermesSender`:
for asynchronous transmission
* **WebClientHermesSender** - for services using [Spring WebFlux](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html);
uses [WebClient](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/reactive/function/client/WebClient.html)
* **JerseyHermesSender** - recommended for services using [Jersey](<https://jersey.java.net/>)
* **JerseyHermesSender** - recommended for services using [Jersey](<https://eclipse-ee4j.github.io/jersey/>)
* **OkHttpHermesSender** - supports both HTTP/1.1 and HTTP/2 protocols, uses [OkHttp3 client](http://square.github.io/okhttp/)


Expand Down
65 changes: 52 additions & 13 deletions docs/docs/user/publishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,21 @@ Failure statuses:
Each topic can define level of acknowledgement (ACK):

* leader ACK - only one Kafka node (leader) needs to acknowledge reception of message
* all ACK - all nodes that hold copy of message need to acknowledge reception of message
* all ACK - at least [min.insync.replicas](https://kafka.apache.org/documentation/#brokerconfigs_min.insync.replicas) nodes must acknowledge reception of message

For most of the topic leader ACK is enough. This guarantees roughly 99.999..% reception rate. Only in rare cases, during
Kafka cluster rebalancing or nodes outage Kafka might confirm that message was received, while it was not saved and it
will be lost.
ACK configuration has the following consequences:

What does it mean in practice? Numbers differ per case and they are affected by multiple factors like frequency of
rebalancing taking place on Kafka clusters, Kafka version etc. In our production environment using ACK leader means we falsely
believe message was received by Kafka once per 20 million events. This is a very rough estimate that should show you
the scale, if you need numbers to base your decision on - please conduct own measurements.
- with `ACK leader` message writes are replicated asynchronously, thus the acknowledgment latency will be low. However, message write may be lost
when there is a topic leadership change - e.g. due to rebalance or broker restart.
- with `ACK all` messages writes are synchronously replicated to replicas. Write acknowledgement latency will be much higher than with leader ACK,
it will also have higher variance due to tail latency. However, messages will be persisted as long as the whole replica set does not go down simultaneously.

If you need 100% guarantee that message was saved, force all replicas to send ACK. The downside of this is much longer
response times, they tend to vary a lot as well. Thanks to Hermes buffering (described in paragraphs below), we are able
to guarantee some sane response times to our clients even in *ACK all* mode.
Publishers are advised to select topic ACK level based on their latency and durability requirements.

## Buffering
Hermes also provides a feature called Buffering (described in paragraphs below) which provides consistent write latency
despite long Kafka response times. Note that, however, this mode may decrease message durability for `ACK all` setting.

## Buffering [deprecated]

Hermes administrator can set maximum time, for which Hermes will wait for Kafka acknowledgment. By default, it is set to
65ms. After that time, **202** response is sent to client. Event is kept in Kafka producer buffer and it's delivery will
Expand All @@ -161,14 +160,54 @@ Kafka is back online.

### Buffer persistence

By default events are buffered in memory only. This raises the question about what happens in case of Hermes node failure
By default, events are buffered in memory only. This raises the question about what happens in case of Hermes node failure
(or force kill of process). Hermes Frontend API exposes callbacks that can be used to implement persistence model of
buffered events.

Default implementation uses [OpenHFT ChronicleMap](https://github.com/OpenHFT/Chronicle-Map) to persist unsent messages
to disk. Map structure is continuously persisted to disk, as it is stored in offheap memory as
[memory mapped file](https://en.wikipedia.org/wiki/Memory-mapped_file).

Using buffering with ACK all setting means that durability of events may be lowered when **202** status code is received. If Hermes instance
is killed before message is spilled to disk or the data on disk becomes corrupted, the message is gone. Thus `ACK all` with **202** status code
is similar to `ACK leader` because a single node failure could cause the message be lost.

### Deprecation notice
The buffering mechanism in Hermes is considered deprecated and is set to be removed in the future.

## Remote DC fallback

Hermes supports a remote datacenter fallback mechanism for [multi datacenter deployments](https://hermes-pubsub.readthedocs.io/en/latest/configuration/kafka-and-zookeeper/#multiple-kafka-and-zookeeper-clusters).

Fallback is configured on per topic basis, using a `fallbackToRemoteDatacenterEnabled` property:

```http request
PUT /topics/my.group.my-topic
{
"fallbackToRemoteDatacenterEnabled": true,
}
```

Using this setting automatically disables buffering mechanism for a topic.

When using this setting for a topic, Hermes will try to send a message to a local datacenter Kafka first and will fall back to remote datacenter Kafka
if the local send fails.

Hermes also provides a speculative fallback mechanism which will send messages to remote Kafka if the local Kafka is not responding in a timely manner.
Speculative send is performed after `frontend.kafka.fail-fast-producer.speculativeSendDelay` elapses.

When using remote DC fallback, Hermes attempts to send a message to Kafka for the duration of `frontend.handlers.maxPublishRequestDuration` property. If after
`maxPublishRequestDuration` Hermes has not received an acknowledgment from Kafka, it will respond with **500** status code to the client.

Table below summarizes remote fallback configuration options:

| Option | Scope | Default value |
|--------------------------------------------------------|--------|---------------|
| fallbackToRemoteDatacenterEnabled | topic | false |
| frontend.kafka.fail-fast-producer.speculativeSendDelay | global | 250ms |
| frontend.handlers.maxPublishRequestDuration | global | 500ms |

## Partition assignment
`Partition-Key` header can be used by publishers to specify Kafka `key` which will be used for partition assignment for a message. This will ensure
that all messages with given `Partition-Key` will be sent to the same Kafka partition.
10 changes: 5 additions & 5 deletions hermes-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ plugins {
}

dependencies {
api group: 'org.hibernate.validator', name: 'hibernate-validator', version: '8.0.0.Final'
api group: 'org.hibernate.validator', name: 'hibernate-validator', version: '8.0.1.Final'

api group: 'jakarta.ws.rs', name: 'jakarta.ws.rs-api', version: '3.1.0'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: versions.jackson
api group: 'com.fasterxml.jackson.jakarta.rs', name: 'jackson-jakarta-rs-json-provider', version: versions.jackson
api group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: versions.jackson
implementation group: 'com.google.guava', name: 'guava', version: versions.guava
api group: 'com.damnhandy', name: 'handy-uri-templates', version: '2.0.2'
api group: 'com.damnhandy', name: 'handy-uri-templates', version: '2.1.8'
api group: 'jakarta.xml.bind', name: 'jakarta.xml.bind-api', version: '4.0.0'

implementation group: 'com.sun.xml.bind', name: 'jaxb-core', version: '4.0.3'
implementation group: 'com.sun.xml.bind', name: 'jaxb-impl', version: '4.0.3'
implementation group: 'jakarta.annotation', name: 'jakarta.annotation-api', version: '2.1.1'
implementation group: 'com.sun.xml.bind', name: 'jaxb-core', version: '4.0.5'
implementation group: 'com.sun.xml.bind', name: 'jaxb-impl', version: '4.0.5'
implementation group: 'jakarta.annotation', name: 'jakarta.annotation-api', version: '3.0.0'


testImplementation group: 'org.spockframework', name: 'spock-core', version: versions.spock
Expand Down
Loading

0 comments on commit 3e78e3a

Please sign in to comment.