diff --git a/direct/io-elastic/license-header-spotless.txt b/direct/io-elastic/license-header-spotless.txt new file mode 120000 index 000000000..077ffe5c5 --- /dev/null +++ b/direct/io-elastic/license-header-spotless.txt @@ -0,0 +1 @@ +../license-header-spotless.txt \ No newline at end of file diff --git a/direct/io-elastic/license-header.txt b/direct/io-elastic/license-header.txt new file mode 120000 index 000000000..9e0b4f073 --- /dev/null +++ b/direct/io-elastic/license-header.txt @@ -0,0 +1 @@ +../license-header.txt \ No newline at end of file diff --git a/direct/io-elastic/pom.xml b/direct/io-elastic/pom.xml new file mode 100644 index 000000000..bc49cfccb --- /dev/null +++ b/direct/io-elastic/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + + cz.o2.proxima + proxima-direct + 0.7-SNAPSHOT + + + proxima-direct-io-elastic + jar + + ${project.groupId}:${project.artifactId} + + + + + cz.o2.proxima + proxima-direct-core + ${project.version} + + + + org.projectlombok + lombok + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 7.9.3 + + + + com.google.code.gson + gson + 2.8.6 + + + + cz.o2.proxima + proxima-core + ${project.version} + tests + test + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.jupiter.version} + test + + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + + + + org.mockito + mockito-core + + + + + + diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticAccessor.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticAccessor.java new file mode 100644 index 000000000..8206bf581 --- /dev/null +++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticAccessor.java @@ -0,0 +1,149 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.direct.elastic; + +import com.google.common.annotations.VisibleForTesting; +import cz.o2.proxima.direct.commitlog.CommitLogReader; +import cz.o2.proxima.direct.core.AttributeWriterBase; +import cz.o2.proxima.direct.core.Context; +import cz.o2.proxima.direct.core.DataAccessor; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.storage.AbstractStorage; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; + +@Slf4j +public class ElasticAccessor extends AbstractStorage implements DataAccessor { + private static final long serialVersionUID = 1L; + + private static final String CFG_PREFIX = "elastic."; + + static final String DEFAULT_SCHEME = "http"; + static final int DEFAULT_CONNECT_TIMEOUT_MS = 5_000; + static final int DEFAULT_CONNECTION_REQUEST_MS = 10_000; + static final int DEFAULT_SOCKET_TIMEOUT_MS = 60_000; + static final int DEFAULT_CONCURRENT_REQUESTS = 1; + static final int DEFAULT_BATCH_SIZE = 100; + static final String DEFAULT_KEYSTORE_TYPE = "PKCS12"; + + @Getter private final Map cfg; + @Getter private final String scheme; + @Getter private final String indexName; + @Getter private final int connectTimeoutMs; + @Getter private final int connectionRequestTimeoutMs; + @Getter private final int socketTimeoutMs; + @Getter private final int concurrentRequests; + @Getter private final int batchSize; + @Getter private final String keystoreType; + @Getter private final String keystorePath; + @Getter private final String keystorePassword; + @Getter private final String truststorePath; + @Getter private final String truststorePassword; + + public ElasticAccessor(EntityDescriptor entityDesc, URI uri, Map cfg) { + super(entityDesc, uri); + this.cfg = cfg; + this.scheme = getStringConfig("scheme", DEFAULT_SCHEME); + this.connectTimeoutMs = getIntConfig("connect-timeout-ms", DEFAULT_CONNECT_TIMEOUT_MS); + this.connectionRequestTimeoutMs = + getIntConfig("connection-request-timeout-ms", DEFAULT_CONNECTION_REQUEST_MS); + this.socketTimeoutMs = getIntConfig("socket-timeout-ms", DEFAULT_SOCKET_TIMEOUT_MS); + this.concurrentRequests = + getIntConfig("concurrent-batch-requests", DEFAULT_CONCURRENT_REQUESTS); + this.batchSize = getIntConfig("batch-size", DEFAULT_BATCH_SIZE); + this.keystoreType = getStringConfig("keystore-type", DEFAULT_KEYSTORE_TYPE); + this.keystorePath = getStringConfig("keystore-path"); + this.keystorePassword = getStringConfig("keystore-password"); + this.truststorePath = getStringConfig("truststore-path"); + this.truststorePassword = getStringConfig("truststore-password"); + this.indexName = parseIndexName(uri); + } + + @VisibleForTesting + public static String parseIndexName(URI uri) { + String path = uri.getPath(); + while (path.endsWith("/")) { + path = path.substring(0, path.length() - 1); + } + if (path.length() <= 1) { + throw new IllegalArgumentException( + "Invalid path in elastic URI " + uri + ". The path represents name of index"); + } + return path.substring(1); + } + + @Override + public Optional getWriter(Context context) { + if (getUri().getScheme().startsWith("elastic")) { + return Optional.of(new ElasticWriter(this)); + } + + return Optional.empty(); + } + + @Override + public Optional getCommitLogReader(Context context) { + return Optional.empty(); + } + + public RestClient getRestClient() { + return ElasticClientFactory.create( + new ElasticClientFactory.Configuration( + getScheme(), + getUri().getAuthority(), + getConnectTimeoutMs(), + getSocketTimeoutMs(), + getConnectionRequestTimeoutMs(), + getKeystoreType(), + getKeystorePath(), + getKeystorePassword(), + getTruststorePath(), + getTruststorePassword())); + } + + public RestHighLevelClient getRestHighLevelClient() { + return new RestHighLevelClient( + ElasticClientFactory.createBuilder( + new ElasticClientFactory.Configuration( + getScheme(), + getUri().getAuthority(), + getConnectTimeoutMs(), + getSocketTimeoutMs(), + getConnectionRequestTimeoutMs(), + getKeystoreType(), + getKeystorePath(), + getKeystorePassword(), + getTruststorePath(), + getTruststorePassword()))); + } + + private int getIntConfig(String key, int defaultValue) { + return Integer.parseInt(cfg.getOrDefault(CFG_PREFIX + key, defaultValue).toString()); + } + + private String getStringConfig(String key) { + return getStringConfig(key, ""); + } + + private String getStringConfig(String key, String defaultValue) { + return cfg.getOrDefault(CFG_PREFIX + key, defaultValue).toString(); + } +} diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticClientFactory.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticClientFactory.java new file mode 100644 index 000000000..acd1e328c --- /dev/null +++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticClientFactory.java @@ -0,0 +1,184 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.direct.elastic; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import lombok.Builder; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHost; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +@Slf4j +public class ElasticClientFactory { + + @Value + @Builder + public static class Configuration { + @Builder.Default String scheme = "http"; + @Builder.Default String hostnames = ""; + @Builder.Default int connectTimeoutMs = 5_000; + @Builder.Default int socketTimeoutMs = 30_000; + @Builder.Default int connectionRequestTimeoutMs = 10_000; + @Builder.Default String keystoreType = "PKCS12"; + @Builder.Default String keystorePath = ""; + @Builder.Default String keystorePassword = ""; + @Builder.Default String truststorePath = ""; + @Builder.Default String truststorePassword = ""; + } + + public static RestClientBuilder createBuilder(Configuration config) { + final RestClientBuilder builder = + RestClient.builder(parseHosts(config.hostnames, config.getScheme())) + .setRequestConfigCallback(createRequestConfigCallback(config)); + + if ("https".equalsIgnoreCase(config.getScheme())) { + builder.setHttpClientConfigCallback(createConfigurationCallback(config)); + } + + return builder; + } + + public static RestClient create(Configuration config) { + return createBuilder(config).build(); + } + + @VisibleForTesting + public static HttpHost[] parseHosts(String hostnames, String scheme) { + final List httpHosts = + Arrays.stream(hostnames.split(",")) + .map( + p -> { + String[] parts = p.split(":"); + if (parts.length == 1) { + return new HttpHost(parts[0], 9200, scheme); + } + + if (parts.length == 2) { + return new HttpHost(parts[0], Integer.parseInt(parts[1]), scheme); + } + + throw new IllegalArgumentException("Invalid host " + p); + }) + .collect(Collectors.toList()); + + final HttpHost[] hostsArray = new HttpHost[httpHosts.size()]; + return httpHosts.toArray(hostsArray); + } + + private static RestClientBuilder.RequestConfigCallback createRequestConfigCallback( + Configuration config) { + return requestConfigBuilder -> + requestConfigBuilder + .setConnectTimeout(config.getConnectTimeoutMs()) + .setSocketTimeout(config.getSocketTimeoutMs()) + .setConnectionRequestTimeout(config.getConnectionRequestTimeoutMs()); + } + + private static RestClientBuilder.HttpClientConfigCallback createConfigurationCallback( + Configuration config) { + try { + SSLContextBuilder sslBuilder = SSLContexts.custom(); + loadClientKeyStore(sslBuilder, config); + loadTrustStore(sslBuilder, config); + final SSLContext sslContext = sslBuilder.build(); + return httpClientBuilder -> httpClientBuilder.setSSLContext(sslContext); + + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new IllegalArgumentException("Cannot initialize SSLContext", e); + } + } + + private static void loadClientKeyStore(SSLContextBuilder sslBuilder, Configuration config) { + if (config.getKeystorePath().isEmpty()) { + log.warn("No client keystore configured."); + return; + } + + try { + final String pass = + config.getKeystorePassword().isEmpty() ? null : config.getKeystorePassword(); + log.info( + "Using keystore: {}, Password protected: {}", config.getKeystorePath(), pass != null); + + final KeyStore clientKeyStore = createKeyStore(config.getKeystorePath(), pass, config); + sslBuilder.loadKeyMaterial(clientKeyStore, pass == null ? null : pass.toCharArray()); + } catch (KeyStoreException + | NoSuchAlgorithmException + | UnrecoverableKeyException + | CertificateException e) { + + throw new IllegalArgumentException("Cannot load keystore: " + config.getKeystorePath(), e); + } + } + + private static void loadTrustStore(SSLContextBuilder sslBuilder, Configuration config) { + if (config.getTruststorePath().isEmpty()) { + log.info("No truststore configured."); + return; + } + + try { + final String pass = + config.getTruststorePassword().isEmpty() ? null : config.getTruststorePassword(); + log.info( + "Using truststore: {}, Password protected: {}", config.getTruststorePath(), pass != null); + final KeyStore clientKeyStore = createKeyStore(config.getTruststorePath(), pass, config); + sslBuilder.loadTrustMaterial(clientKeyStore, new TrustSelfSignedStrategy()); + } catch (KeyStoreException | NoSuchAlgorithmException | CertificateException e) { + throw new IllegalArgumentException( + "Cannot load truststore: " + config.getTruststorePath(), e); + } + } + + private static KeyStore createKeyStore( + String keyStorePath, @Nullable String keyStorePassword, Configuration config) + throws KeyStoreException, CertificateException, NoSuchAlgorithmException { + final KeyStore keyStore = KeyStore.getInstance(config.getKeystoreType()); + final File keyStoreFile = new File(keyStorePath); + if (!keyStoreFile.exists()) { + throw new IllegalArgumentException("Couldn't find file: " + keyStorePath); + } else { + char[] keyStorePasswordChars = + keyStorePassword == null ? null : keyStorePassword.toCharArray(); + try (InputStream is = Files.newInputStream(keyStoreFile.toPath())) { + keyStore.load(is, keyStorePasswordChars); + } catch (IOException e) { + throw new IllegalArgumentException("Couldn't load file: " + keyStorePath); + } + } + return keyStore; + } +} diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticStorage.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticStorage.java new file mode 100644 index 000000000..294309486 --- /dev/null +++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticStorage.java @@ -0,0 +1,40 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.direct.elastic; + +import com.google.common.collect.Sets; +import cz.o2.proxima.direct.core.DataAccessorFactory; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.repository.AttributeFamilyDescriptor; +import java.net.URI; + +public class ElasticStorage implements DataAccessorFactory { + + private static final long serialVersionUID = 1L; + + @Override + public ElasticAccessor createAccessor(DirectDataOperator op, AttributeFamilyDescriptor family) { + + return new ElasticAccessor(family.getEntity(), family.getStorageUri(), family.getCfg()); + } + + @Override + public Accept accepts(URI uri) { + return Sets.newHashSet("elastic", "elasticsearch").contains(uri.getScheme()) + ? Accept.ACCEPT + : Accept.REJECT; + } +} diff --git a/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticWriter.java b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticWriter.java new file mode 100644 index 000000000..e723d1d27 --- /dev/null +++ b/direct/io-elastic/src/main/java/cz/o2/proxima/direct/elastic/ElasticWriter.java @@ -0,0 +1,160 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.direct.elastic; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.gson.JsonObject; +import cz.o2.proxima.direct.core.CommitCallback; +import cz.o2.proxima.direct.core.OnlineAttributeWriter; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.storage.StreamElement; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentType; + +@Slf4j +public class ElasticWriter implements OnlineAttributeWriter, BulkProcessor.Listener { + private final ElasticAccessor accessor; + private final RestHighLevelClient client; + private final Map callbacksToCommit = new ConcurrentHashMap<>(); + private final BulkProcessor bulkProcessor; + + public ElasticWriter(ElasticAccessor accessor) { + this.accessor = accessor; + this.client = accessor.getRestHighLevelClient(); + this.bulkProcessor = + BulkProcessor.builder( + (request, bulkListener) -> + client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), + this) + .setBulkActions(accessor.getBatchSize()) + .setConcurrentRequests(accessor.getConcurrentRequests()) + .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB)) + .build(); + } + + @Override + public URI getUri() { + return accessor.getUri(); + } + + @Override + public void write(StreamElement element, CommitCallback commitCallback) { + Preconditions.checkArgument(!element.isDelete(), "Delete not supported."); + Preconditions.checkArgument( + !element.getAttributeDescriptor().isWildcard(), "Wildcard not supported."); + + final IndexRequest request = + new IndexRequest(accessor.getIndexName()) + .id(element.getKey()) + .opType(DocWriteRequest.OpType.INDEX) + .source(toJson(element), XContentType.JSON); + + callbacksToCommit.put(request, commitCallback); + bulkProcessor.add(request); + } + + @VisibleForTesting + public String toJson(StreamElement element) { + final JsonObject jsonObject = new JsonObject(); + + jsonObject.addProperty("key", element.getKey()); + jsonObject.addProperty("entity", element.getEntityDescriptor().getName()); + jsonObject.addProperty("attribute", element.getAttribute()); + jsonObject.addProperty("timestamp", element.getStamp()); + jsonObject.addProperty("uuid", element.getUuid()); + jsonObject.addProperty("updated_at", System.currentTimeMillis()); + + final Optional data = element.getParsed(); + if (data.isPresent()) { + @SuppressWarnings("unchecked") + final AttributeDescriptor attributeDescriptor = + (AttributeDescriptor) element.getAttributeDescriptor(); + final String dataJson = attributeDescriptor.getValueSerializer().asJsonValue(data.get()); + jsonObject.addProperty("data", "${data}"); + return jsonObject.toString().replace("\"${data}\"", dataJson); + } + + return jsonObject.toString(); + } + + @Override + public void close() { + try { + bulkProcessor.close(); + client.close(); + } catch (IOException e) { + log.warn("Closing problem", e); + throw new RuntimeException(e); + } + } + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + log.debug("Bulk starting with executionId: {}", executionId); + } + + @Override + public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) { + log.debug("Bulk with executionId: {} finished successfully ", executionId); + final List> requests = bulkRequest.requests(); + Arrays.stream(bulkResponse.getItems()) + .forEach( + resp -> { + final IndexRequest request = (IndexRequest) requests.get(resp.getItemId()); + Preconditions.checkState( + request.id().equals(resp.getId()), + "Request document id doesn't match with response document id"); + final CommitCallback callback = + Objects.requireNonNull(callbacksToCommit.remove(request)); + if (resp.isFailed()) { + callback.commit(false, resp.getFailure().getCause()); + } else { + callback.commit(true, null); + } + }); + } + + @Override + public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable failure) { + log.warn(String.format("Bulk with executionId: %s finished with error", executionId), failure); + bulkRequest + .requests() + .forEach(r -> Objects.requireNonNull(callbacksToCommit.remove(r)).commit(false, failure)); + } + + @Override + public Factory asFactory() { + return repo -> new ElasticWriter(accessor); + } +} diff --git a/direct/io-elastic/src/main/resources/META-INF/services/cz.o2.proxima.direct.core.DataAccessorFactory b/direct/io-elastic/src/main/resources/META-INF/services/cz.o2.proxima.direct.core.DataAccessorFactory new file mode 100644 index 000000000..b7ab2bf18 --- /dev/null +++ b/direct/io-elastic/src/main/resources/META-INF/services/cz.o2.proxima.direct.core.DataAccessorFactory @@ -0,0 +1 @@ +cz.o2.proxima.direct.elastic.ElasticStorage diff --git a/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticAccessorTest.java b/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticAccessorTest.java new file mode 100644 index 000000000..075a58fac --- /dev/null +++ b/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticAccessorTest.java @@ -0,0 +1,116 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.direct.elastic; + +import static cz.o2.proxima.direct.elastic.ElasticAccessor.*; +import static cz.o2.proxima.direct.elastic.ElasticAccessor.DEFAULT_KEYSTORE_TYPE; +import static cz.o2.proxima.direct.elastic.ElasticAccessor.DEFAULT_SOCKET_TIMEOUT_MS; +import static cz.o2.proxima.direct.elastic.ElasticAccessor.parseIndexName; +import static org.junit.jupiter.api.Assertions.*; + +import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.repository.Repository; +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class ElasticAccessorTest { + + private static final String MODEL = + "{\n" + + " entities: {\n" + + " test {\n" + + " attributes {\n" + + " data: { scheme: \"string\" }\n" + + " }\n" + + " }\n" + + " }\n" + + "\n" + + " attributeFamilies: {\n" + + " test_storage_stream {\n" + + " entity: test\n" + + " attributes: [ data ]\n" + + " storage: \"inmem:///test_inmem\"\n" + + " type: primary\n" + + " access: commit-log\n" + + " }\n" + + " }\n" + + "\n" + + "}\n"; + + private final Repository repository = Repository.of(ConfigFactory.parseString(MODEL)); + + @Test + public void testConfigurationDefault() { + ElasticAccessor accessor = + new ElasticAccessor( + repository.getEntity("test"), + URI.create("elastic://example.com/my_index"), + Collections.emptyMap()); + assertEquals(DEFAULT_SCHEME, accessor.getScheme()); + assertEquals(DEFAULT_CONNECT_TIMEOUT_MS, accessor.getConnectTimeoutMs()); + assertEquals(DEFAULT_CONNECTION_REQUEST_MS, accessor.getConnectionRequestTimeoutMs()); + assertEquals(DEFAULT_SOCKET_TIMEOUT_MS, accessor.getSocketTimeoutMs()); + assertEquals("my_index", accessor.getIndexName()); + assertEquals(DEFAULT_KEYSTORE_TYPE, accessor.getKeystoreType()); + assertEquals("", accessor.getKeystorePassword()); + assertEquals("", accessor.getKeystorePath()); + assertEquals("", accessor.getTruststorePath()); + assertEquals("", accessor.getTruststorePassword()); + } + + @Test + public void testConfiguration() { + Map cfg = + new HashMap() { + { + put("elastic.scheme", "https"); + put("elastic.connect-timeout-ms", 10); + put("elastic.connection-request-timeout-ms", 20); + put("elastic.socket-timeout-ms", 30); + put("elastic.keystore-type", "JKS"); + put("elastic.keystore-path", "/opt/k1"); + put("elastic.keystore-password", "secret"); + put("elastic.truststore-path", "/opt/k2"); + put("elastic.truststore-password", "secret2"); + } + }; + + ElasticAccessor accessor = + new ElasticAccessor( + repository.getEntity("test"), URI.create("elastic://example.com/my_index"), cfg); + assertEquals("https", accessor.getScheme()); + assertEquals(10, accessor.getConnectTimeoutMs()); + assertEquals(20, accessor.getConnectionRequestTimeoutMs()); + assertEquals(30, accessor.getSocketTimeoutMs()); + assertEquals("my_index", accessor.getIndexName()); + assertEquals("JKS", accessor.getKeystoreType()); + assertEquals("/opt/k1", accessor.getKeystorePath()); + assertEquals("secret", accessor.getKeystorePassword()); + assertEquals("/opt/k2", accessor.getTruststorePath()); + assertEquals("secret2", accessor.getTruststorePassword()); + } + + @Test + public void testParseIndexName() { + assertEquals( + "my_index", parseIndexName(URI.create("elastic://example.com:9093/my_index/?query=2"))); + assertEquals("my_index", parseIndexName(URI.create("elastic://example.com/my_index"))); + assertEquals("my_index", parseIndexName(URI.create("elastic://example.com/my_index/"))); + } +} diff --git a/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticClientFactoryTest.java b/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticClientFactoryTest.java new file mode 100644 index 000000000..ee13358b3 --- /dev/null +++ b/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticClientFactoryTest.java @@ -0,0 +1,37 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.direct.elastic; + +import static cz.o2.proxima.direct.elastic.ElasticClientFactory.parseHosts; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.http.HttpHost; +import org.junit.jupiter.api.Test; + +class ElasticClientFactoryTest { + + @Test + public void testParseHosts() { + HttpHost[] hosts = parseHosts("example.com:9093,example2.com", "http"); + assertEquals(2, hosts.length); + assertEquals("example.com", hosts[0].getHostName()); + assertEquals(9093, hosts[0].getPort()); + assertEquals("http", hosts[0].getSchemeName()); + assertEquals("example2.com", hosts[1].getHostName()); + assertEquals(9200, hosts[1].getPort()); + assertEquals("http", hosts[1].getSchemeName()); + } +} diff --git a/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticStorageTest.java b/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticStorageTest.java new file mode 100644 index 000000000..a5675ab0a --- /dev/null +++ b/direct/io-elastic/src/test/java/cz/o2/proxima/direct/elastic/ElasticStorageTest.java @@ -0,0 +1,33 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.direct.elastic; + +import static cz.o2.proxima.storage.internal.AbstractDataAccessorFactory.*; +import static org.junit.jupiter.api.Assertions.*; + +import java.net.URI; +import org.junit.jupiter.api.Test; + +class ElasticStorageTest { + + @Test + public void testAccept() { + ElasticStorage storage = new ElasticStorage(); + assertEquals(Accept.ACCEPT, storage.accepts(URI.create("elastic://asdas"))); + assertEquals(Accept.ACCEPT, storage.accepts(URI.create("elasticsearch://asdas"))); + assertEquals(Accept.REJECT, storage.accepts(URI.create("es://asdas"))); + } +} diff --git a/direct/pom.xml b/direct/pom.xml index 3556c066b..c37672b22 100644 --- a/direct/pom.xml +++ b/direct/pom.xml @@ -83,6 +83,7 @@ io-bulkfs io-bulkfs-parquet io-cassandra + io-elastic io-gcloud-storage io-hadoop io-hbase @@ -113,6 +114,7 @@ io-bulkfs io-bulkfs-parquet io-cassandra + io-elastic io-gcloud-storage io-http io-kafka