Apache Flink connector to Elasticsearch v8
This project aims to fill the gap between Apache Flink and Elasticsearch version 8. Officially, the Flink project has connectors to Elasticsearch versions 6 and 7, and they share the same base. However, the latest version of Elasticsearch count on a brand new Java API Client, instead of the now deprecated RestHighLevelClient
. That's the main reason to create this connector from scratch.
Similarly to the previous versions, internally each parallel instance uses a buffer to send requests to the Elasticsearch cluster in bulk. The buffer can be flushed by the threshold or enabling the checkpointing.
<dependency>
<groupId>com.mtfelisb</groupId>
<artifactId>flink-connector-elasticsearch</artifactId>
<version>1.0.0</version>
</dependency>
mvn install
implementation com.mtfelisb:flink-connector-elasticsearch:1.0.0
DataStream<T> stream = ...
final ElasticsearchSink<T> esSink = ElasticsearchSinkBuilder.<T>builder()
.setThreshold(100L)
.setHost("localhost")
.setPort(9200)
.setEmitter(
(element, operation, context) ->
(BulkOperation.Builder) operation
.update(up -> up
.id(element.getId())
.index(ELASTICSEARCH_INDEX)
.action(ac -> ac.doc(element.getValue()))
)
)
.build();
stream.addSink(esSink);
env.execute();
The flink-connector-elasticsearch
is integrated with Flink's checkpointing mechanism, meaning that it will flush all buffered data into the Elasticsearch cluster when the checkpoint is triggered automatically. Hence, flink-connector-elasticsearch
holds AT_LEAST_ONCE
guarantee when the checkpoint is enabled.
Important: The EXACTLY_ONCE
guarantee can also be achieved if the update operation holds deterministic ids and the upsert is flagged true.
Name | Type | Required | Description |
---|---|---|---|
threshold | Long | Yes | The internal buffer limit to send requests |
host | String | Yes | The host to reach the Elasticsearch cluster |
port | int | Yes | The port to reach the Elasticsearch cluster |
emitter | Emitter<> | Yes | The Emitter implementation to process each element before pushing to the buffer |
username | String | No | The username to authenticate in the Elasticsearch cluster |
password | String | No | The password to authenticate in the Elasticsearch cluster |
- Initial release
See the LICENSE file for more details.