diff --git a/direct/io-elastic/license-header-spotless.txt b/direct/io-elastic/license-header-spotless.txt
new file mode 120000
index 000000000..077ffe5c5
--- /dev/null
+++ b/direct/io-elastic/license-header-spotless.txt
@@ -0,0 +1 @@
+../license-header-spotless.txt
\ No newline at end of file
diff --git a/direct/io-elastic/license-header.txt b/direct/io-elastic/license-header.txt
new file mode 120000
index 000000000..9e0b4f073
--- /dev/null
+++ b/direct/io-elastic/license-header.txt
@@ -0,0 +1 @@
+../license-header.txt
\ No newline at end of file
diff --git a/direct/io-elastic/pom.xml b/direct/io-elastic/pom.xml
new file mode 100644
index 000000000..bc49cfccb
--- /dev/null
+++ b/direct/io-elastic/pom.xml
@@ -0,0 +1,89 @@
+
+
+
+ 4.0.0
+
+
+ cz.o2.proxima
+ proxima-direct
+ 0.7-SNAPSHOT
+
+
+ proxima-direct-io-elastic
+ jar
+
+ ${project.groupId}:${project.artifactId}
+
+
+
+
+ cz.o2.proxima
+ proxima-direct-core
+ ${project.version}
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-high-level-client
+ 7.9.3
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.6
+
+
+
+ cz.o2.proxima
+ proxima-core
+ ${project.version}
+ tests
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit.jupiter.version}
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.jupiter.version}
+ test
+
+
+
+ org.mockito
+ mockito-core
+
+
+
+
+
+
diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticAccessor.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticAccessor.java
new file mode 100644
index 000000000..8206bf581
--- /dev/null
+++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticAccessor.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.direct.elastic;
+
+import com.google.common.annotations.VisibleForTesting;
+import cz.o2.proxima.direct.commitlog.CommitLogReader;
+import cz.o2.proxima.direct.core.AttributeWriterBase;
+import cz.o2.proxima.direct.core.Context;
+import cz.o2.proxima.direct.core.DataAccessor;
+import cz.o2.proxima.repository.EntityDescriptor;
+import cz.o2.proxima.storage.AbstractStorage;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+
+@Slf4j
+public class ElasticAccessor extends AbstractStorage implements DataAccessor {
+ private static final long serialVersionUID = 1L;
+
+ private static final String CFG_PREFIX = "elastic.";
+
+ static final String DEFAULT_SCHEME = "http";
+ static final int DEFAULT_CONNECT_TIMEOUT_MS = 5_000;
+ static final int DEFAULT_CONNECTION_REQUEST_MS = 10_000;
+ static final int DEFAULT_SOCKET_TIMEOUT_MS = 60_000;
+ static final int DEFAULT_CONCURRENT_REQUESTS = 1;
+ static final int DEFAULT_BATCH_SIZE = 100;
+ static final String DEFAULT_KEYSTORE_TYPE = "PKCS12";
+
+ @Getter private final Map cfg;
+ @Getter private final String scheme;
+ @Getter private final String indexName;
+ @Getter private final int connectTimeoutMs;
+ @Getter private final int connectionRequestTimeoutMs;
+ @Getter private final int socketTimeoutMs;
+ @Getter private final int concurrentRequests;
+ @Getter private final int batchSize;
+ @Getter private final String keystoreType;
+ @Getter private final String keystorePath;
+ @Getter private final String keystorePassword;
+ @Getter private final String truststorePath;
+ @Getter private final String truststorePassword;
+
+ public ElasticAccessor(EntityDescriptor entityDesc, URI uri, Map cfg) {
+ super(entityDesc, uri);
+ this.cfg = cfg;
+ this.scheme = getStringConfig("scheme", DEFAULT_SCHEME);
+ this.connectTimeoutMs = getIntConfig("connect-timeout-ms", DEFAULT_CONNECT_TIMEOUT_MS);
+ this.connectionRequestTimeoutMs =
+ getIntConfig("connection-request-timeout-ms", DEFAULT_CONNECTION_REQUEST_MS);
+ this.socketTimeoutMs = getIntConfig("socket-timeout-ms", DEFAULT_SOCKET_TIMEOUT_MS);
+ this.concurrentRequests =
+ getIntConfig("concurrent-batch-requests", DEFAULT_CONCURRENT_REQUESTS);
+ this.batchSize = getIntConfig("batch-size", DEFAULT_BATCH_SIZE);
+ this.keystoreType = getStringConfig("keystore-type", DEFAULT_KEYSTORE_TYPE);
+ this.keystorePath = getStringConfig("keystore-path");
+ this.keystorePassword = getStringConfig("keystore-password");
+ this.truststorePath = getStringConfig("truststore-path");
+ this.truststorePassword = getStringConfig("truststore-password");
+ this.indexName = parseIndexName(uri);
+ }
+
+ @VisibleForTesting
+ public static String parseIndexName(URI uri) {
+ String path = uri.getPath();
+ while (path.endsWith("/")) {
+ path = path.substring(0, path.length() - 1);
+ }
+ if (path.length() <= 1) {
+ throw new IllegalArgumentException(
+ "Invalid path in elastic URI " + uri + ". The path represents name of index");
+ }
+ return path.substring(1);
+ }
+
+ @Override
+ public Optional getWriter(Context context) {
+ if (getUri().getScheme().startsWith("elastic")) {
+ return Optional.of(new ElasticWriter(this));
+ }
+
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional getCommitLogReader(Context context) {
+ return Optional.empty();
+ }
+
+ public RestClient getRestClient() {
+ return ElasticClientFactory.create(
+ new ElasticClientFactory.Configuration(
+ getScheme(),
+ getUri().getAuthority(),
+ getConnectTimeoutMs(),
+ getSocketTimeoutMs(),
+ getConnectionRequestTimeoutMs(),
+ getKeystoreType(),
+ getKeystorePath(),
+ getKeystorePassword(),
+ getTruststorePath(),
+ getTruststorePassword()));
+ }
+
+ public RestHighLevelClient getRestHighLevelClient() {
+ return new RestHighLevelClient(
+ ElasticClientFactory.createBuilder(
+ new ElasticClientFactory.Configuration(
+ getScheme(),
+ getUri().getAuthority(),
+ getConnectTimeoutMs(),
+ getSocketTimeoutMs(),
+ getConnectionRequestTimeoutMs(),
+ getKeystoreType(),
+ getKeystorePath(),
+ getKeystorePassword(),
+ getTruststorePath(),
+ getTruststorePassword())));
+ }
+
+ private int getIntConfig(String key, int defaultValue) {
+ return Integer.parseInt(cfg.getOrDefault(CFG_PREFIX + key, defaultValue).toString());
+ }
+
+ private String getStringConfig(String key) {
+ return getStringConfig(key, "");
+ }
+
+ private String getStringConfig(String key, String defaultValue) {
+ return cfg.getOrDefault(CFG_PREFIX + key, defaultValue).toString();
+ }
+}
diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticClientFactory.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticClientFactory.java
new file mode 100644
index 000000000..acd1e328c
--- /dev/null
+++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticClientFactory.java
@@ -0,0 +1,184 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.direct.elastic;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import lombok.Builder;
+import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpHost;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+@Slf4j
+public class ElasticClientFactory {
+
+ @Value
+ @Builder
+ public static class Configuration {
+ @Builder.Default String scheme = "http";
+ @Builder.Default String hostnames = "";
+ @Builder.Default int connectTimeoutMs = 5_000;
+ @Builder.Default int socketTimeoutMs = 30_000;
+ @Builder.Default int connectionRequestTimeoutMs = 10_000;
+ @Builder.Default String keystoreType = "PKCS12";
+ @Builder.Default String keystorePath = "";
+ @Builder.Default String keystorePassword = "";
+ @Builder.Default String truststorePath = "";
+ @Builder.Default String truststorePassword = "";
+ }
+
+ public static RestClientBuilder createBuilder(Configuration config) {
+ final RestClientBuilder builder =
+ RestClient.builder(parseHosts(config.hostnames, config.getScheme()))
+ .setRequestConfigCallback(createRequestConfigCallback(config));
+
+ if ("https".equalsIgnoreCase(config.getScheme())) {
+ builder.setHttpClientConfigCallback(createConfigurationCallback(config));
+ }
+
+ return builder;
+ }
+
+ public static RestClient create(Configuration config) {
+ return createBuilder(config).build();
+ }
+
+ @VisibleForTesting
+ public static HttpHost[] parseHosts(String hostnames, String scheme) {
+ final List httpHosts =
+ Arrays.stream(hostnames.split(","))
+ .map(
+ p -> {
+ String[] parts = p.split(":");
+ if (parts.length == 1) {
+ return new HttpHost(parts[0], 9200, scheme);
+ }
+
+ if (parts.length == 2) {
+ return new HttpHost(parts[0], Integer.parseInt(parts[1]), scheme);
+ }
+
+ throw new IllegalArgumentException("Invalid host " + p);
+ })
+ .collect(Collectors.toList());
+
+ final HttpHost[] hostsArray = new HttpHost[httpHosts.size()];
+ return httpHosts.toArray(hostsArray);
+ }
+
+ private static RestClientBuilder.RequestConfigCallback createRequestConfigCallback(
+ Configuration config) {
+ return requestConfigBuilder ->
+ requestConfigBuilder
+ .setConnectTimeout(config.getConnectTimeoutMs())
+ .setSocketTimeout(config.getSocketTimeoutMs())
+ .setConnectionRequestTimeout(config.getConnectionRequestTimeoutMs());
+ }
+
+ private static RestClientBuilder.HttpClientConfigCallback createConfigurationCallback(
+ Configuration config) {
+ try {
+ SSLContextBuilder sslBuilder = SSLContexts.custom();
+ loadClientKeyStore(sslBuilder, config);
+ loadTrustStore(sslBuilder, config);
+ final SSLContext sslContext = sslBuilder.build();
+ return httpClientBuilder -> httpClientBuilder.setSSLContext(sslContext);
+
+ } catch (NoSuchAlgorithmException | KeyManagementException e) {
+ throw new IllegalArgumentException("Cannot initialize SSLContext", e);
+ }
+ }
+
+ private static void loadClientKeyStore(SSLContextBuilder sslBuilder, Configuration config) {
+ if (config.getKeystorePath().isEmpty()) {
+ log.warn("No client keystore configured.");
+ return;
+ }
+
+ try {
+ final String pass =
+ config.getKeystorePassword().isEmpty() ? null : config.getKeystorePassword();
+ log.info(
+ "Using keystore: {}, Password protected: {}", config.getKeystorePath(), pass != null);
+
+ final KeyStore clientKeyStore = createKeyStore(config.getKeystorePath(), pass, config);
+ sslBuilder.loadKeyMaterial(clientKeyStore, pass == null ? null : pass.toCharArray());
+ } catch (KeyStoreException
+ | NoSuchAlgorithmException
+ | UnrecoverableKeyException
+ | CertificateException e) {
+
+ throw new IllegalArgumentException("Cannot load keystore: " + config.getKeystorePath(), e);
+ }
+ }
+
+ private static void loadTrustStore(SSLContextBuilder sslBuilder, Configuration config) {
+ if (config.getTruststorePath().isEmpty()) {
+ log.info("No truststore configured.");
+ return;
+ }
+
+ try {
+ final String pass =
+ config.getTruststorePassword().isEmpty() ? null : config.getTruststorePassword();
+ log.info(
+ "Using truststore: {}, Password protected: {}", config.getTruststorePath(), pass != null);
+ final KeyStore clientKeyStore = createKeyStore(config.getTruststorePath(), pass, config);
+ sslBuilder.loadTrustMaterial(clientKeyStore, new TrustSelfSignedStrategy());
+ } catch (KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
+ throw new IllegalArgumentException(
+ "Cannot load truststore: " + config.getTruststorePath(), e);
+ }
+ }
+
+ private static KeyStore createKeyStore(
+ String keyStorePath, @Nullable String keyStorePassword, Configuration config)
+ throws KeyStoreException, CertificateException, NoSuchAlgorithmException {
+ final KeyStore keyStore = KeyStore.getInstance(config.getKeystoreType());
+ final File keyStoreFile = new File(keyStorePath);
+ if (!keyStoreFile.exists()) {
+ throw new IllegalArgumentException("Couldn't find file: " + keyStorePath);
+ } else {
+ char[] keyStorePasswordChars =
+ keyStorePassword == null ? null : keyStorePassword.toCharArray();
+ try (InputStream is = Files.newInputStream(keyStoreFile.toPath())) {
+ keyStore.load(is, keyStorePasswordChars);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Couldn't load file: " + keyStorePath);
+ }
+ }
+ return keyStore;
+ }
+}
diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticStorage.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticStorage.java
new file mode 100644
index 000000000..294309486
--- /dev/null
+++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticStorage.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.direct.elastic;
+
+import com.google.common.collect.Sets;
+import cz.o2.proxima.direct.core.DataAccessorFactory;
+import cz.o2.proxima.direct.core.DirectDataOperator;
+import cz.o2.proxima.repository.AttributeFamilyDescriptor;
+import java.net.URI;
+
+public class ElasticStorage implements DataAccessorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ElasticAccessor createAccessor(DirectDataOperator op, AttributeFamilyDescriptor family) {
+
+ return new ElasticAccessor(family.getEntity(), family.getStorageUri(), family.getCfg());
+ }
+
+ @Override
+ public Accept accepts(URI uri) {
+ return Sets.newHashSet("elastic", "elasticsearch").contains(uri.getScheme())
+ ? Accept.ACCEPT
+ : Accept.REJECT;
+ }
+}
diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticWriter.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticWriter.java
new file mode 100644
index 000000000..e723d1d27
--- /dev/null
+++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticWriter.java
@@ -0,0 +1,160 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.direct.elastic;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.gson.JsonObject;
+import cz.o2.proxima.direct.core.CommitCallback;
+import cz.o2.proxima.direct.core.OnlineAttributeWriter;
+import cz.o2.proxima.repository.AttributeDescriptor;
+import cz.o2.proxima.storage.StreamElement;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+
+@Slf4j
+public class ElasticWriter implements OnlineAttributeWriter, BulkProcessor.Listener {
+ private final ElasticAccessor accessor;
+ private final RestHighLevelClient client;
+ private final Map callbacksToCommit = new ConcurrentHashMap<>();
+ private final BulkProcessor bulkProcessor;
+
+ public ElasticWriter(ElasticAccessor accessor) {
+ this.accessor = accessor;
+ this.client = accessor.getRestHighLevelClient();
+ this.bulkProcessor =
+ BulkProcessor.builder(
+ (request, bulkListener) ->
+ client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
+ this)
+ .setBulkActions(accessor.getBatchSize())
+ .setConcurrentRequests(accessor.getConcurrentRequests())
+ .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
+ .build();
+ }
+
+ @Override
+ public URI getUri() {
+ return accessor.getUri();
+ }
+
+ @Override
+ public void write(StreamElement element, CommitCallback commitCallback) {
+ Preconditions.checkArgument(!element.isDelete(), "Delete not supported.");
+ Preconditions.checkArgument(
+ !element.getAttributeDescriptor().isWildcard(), "Wildcard not supported.");
+
+ final IndexRequest request =
+ new IndexRequest(accessor.getIndexName())
+ .id(element.getKey())
+ .opType(DocWriteRequest.OpType.INDEX)
+ .source(toJson(element), XContentType.JSON);
+
+ callbacksToCommit.put(request, commitCallback);
+ bulkProcessor.add(request);
+ }
+
+ @VisibleForTesting
+ public String toJson(StreamElement element) {
+ final JsonObject jsonObject = new JsonObject();
+
+ jsonObject.addProperty("key", element.getKey());
+ jsonObject.addProperty("entity", element.getEntityDescriptor().getName());
+ jsonObject.addProperty("attribute", element.getAttribute());
+ jsonObject.addProperty("timestamp", element.getStamp());
+ jsonObject.addProperty("uuid", element.getUuid());
+ jsonObject.addProperty("updated_at", System.currentTimeMillis());
+
+ final Optional