Skip to content

Add Elastic Search 6.8.3 compatibility library #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions persistence/sensorhub-storage-es-rest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Elasticsearch storage implementation

This is a storage module allowing one to store and retrieve data to/from an elasticsearch V6+ server. It uses the elasticsearch
Java High Level REST Client [link](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html).

The difference with the repository sensorhub-storage-es is:

- Java High Level REST Client, which executes HTTP requests rather than serialized Java requests
- Convert data frame into ElasticSearch data component instead of obfuscating into a Java Serialized Object. (except for the OSH MetaData)

The driver that create ElasticSearch index may not support your DataComponent, in this case create an issue with the specification of your unsupported DataComponent. (Missing fields)

Three main OSH interfaces have been implemented:
1. IRecordStorageModule
2. IObsStorageModule
3. IMultiSourceStorage

## Main classes

An iterator wrapper class has been used to wrap scroll response without specify the scroll id every times. The ESIterator takes care
about making new requests with the specify scrollID when it necessary.

A bulk processor is in charge of sending create/update/delete requests. The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.

Some settings are available through the ESBasicStorageConfig class:
- clusterName: ES cluster name
- user: ElasticSearch user for authentication (leave blank if not required)
- password: ElasticSearch password for authentication
- autoRefresh: Refresh store on commit. Require indices:admin/refresh rights
- filterByStorageId: Multiple storage instance can use the same index. If the filtering is disabled this driver will see all sensors (should be used only for read-only SOS service)
- certificatesPath: List of additional SSL certificates for ElasticSearch connection
- nodeUrls: list of nodes under the format <host>:<port>
- indexNamePrepend: String to add in index name before the data name
- indexNameMetaData: Index name of the OpenSensorHub metadata
- scrollMaxDuration: When scrolling, the maximum duration ScrollableResults will be usable if no other results are fetched from, in ms
- scrollFetchSize: When scrolling, the number of results fetched by each Elasticsearch call
- connectTimeout: Determines the timeout in milliseconds until a connection is established. A timeout value of zero is interpreted as an infinite timeout.
- socketTimeout: Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets).
- maxRetryTimeout: Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request.
- bulkConcurrentRequests: Set the number of concurrent requests
- bulkActions: execute the bulk every n requests
- bulkSize: flush the bulk every n mb
- bulkFlushInterval: flush the bulk every n seconds whatever the number of requests
- maxBulkRetry: Bulk insertion may fail, client will resend in case of TimeOut exception. Retry is disabled by default in order to avoid overflow of ElasticSearch cluster

A special parser into this driver will create appropriate default Elastic Search index mapping for each OSH DataComponent.
You can override this mapping using Elastic Search tools. (ex. Kibana)

## Mappings

There are the different mappings depending on the storage used:

1. Open Sensor Hub specific metadata
```json
{
"mapping": {
"osh_metadata": {
"properties": {
"blob": {
"type": "binary"
},
"index": {
"type": "keyword"
},
"metadataType": {
"type": "keyword"
},
"storageID": {
"type": "keyword"
},
"timestamp": {
"type": "date",
"format": "epoch_millis"
}
}
}
}
```

2. Sensor Location
```json
{
"mapping": {
"sensorLocation": {
"dynamic": "false",
"properties": {
"location": {
"type": "geo_point"
},
"location_height": {
"type": "double"
},
"producerID": {
"type": "keyword"
},
"storageID": {
"type": "keyword"
},
"timestamp": {
"type": "date",
"format": "epoch_millis"
}
}
}
}
}
```
48 changes: 48 additions & 0 deletions persistence/sensorhub-storage-es-rest/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
description = 'Elastic search Storage'
ext.details = 'Storage based on Elastic search database'
version = '1.0.1-SNAPSHOT'

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
compile 'org.sensorhub:sensorhub-core:' + oshCoreVersion
compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:6.8.3'
compile 'com.esotericsoftware:kryo:4.0.0'


compile "org.locationtech.spatial4j:spatial4j:0.7"
compile "org.locationtech.jts:jts-core:1.15.0"

testCompile group:'org.sensorhub', name:'sensorhub-core', version:oshCoreVersion, configuration: 'testArtifacts'
}

// only run tests using mock DB automatically
// exclude tests requiring connection to the database; these have to be run manually
test {
exclude '**/integration/*.class'
}

// add info to OSGi manifest
jar {
manifest {
instruction 'Bundle-Vendor', 'Sensia Software LLC'
instruction 'Bundle-Activator', 'org.sensorhub.process.math.Activator'
}
}

// add info to maven pom
ext.pom >>= {
developers {
developer {
id 'mdhsl'
name 'Mathieu Dhainaut'
}
developer {
id 'nicolas-f'
name 'Nicolas Fortin'
organization 'UGE - UMRAE'
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package org.elasticsearch.client;

import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.zip.GZIPOutputStream;

import static java.util.Collections.emptySet;
import static org.elasticsearch.client.RequestConverters.createContentType;
import static org.elasticsearch.client.RequestConverters.enforceSameContentType;

public class CompressedClient extends RestHighLevelClient {
private static final String GZIP_CODEC = "gzip";


public CompressedClient(RestClientBuilder restClientBuilder) {
super(restClientBuilder);
}

public CompressedClient(RestClientBuilder restClientBuilder, List<NamedXContentRegistry.Entry> namedXContentEntries) {
super(restClientBuilder, namedXContentEntries);
}

public CompressedClient(RestClient restClient, CheckedConsumer<RestClient, IOException> doClose, List<NamedXContentRegistry.Entry> namedXContentEntries) {
super(restClient, doClose, namedXContentEntries);
}


/**
* Asynchronously executes a bulk request using the Bulk API
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>
*/
public final void bulkCompressedAsync(BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(bulkRequest, CompressedClient::bulkCompressed, BulkResponse::fromXContent, listener, emptySet(), headers);
}


static Request bulkCompressed(BulkRequest bulkRequest) throws IOException {


Request request = new Request(HttpPost.METHOD_NAME, "/_bulk");

RequestConverters.Params parameters = new RequestConverters.Params(request);
parameters.withTimeout(bulkRequest.timeout());
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());
parameters.withPipeline(bulkRequest.pipeline());
parameters.withRouting(bulkRequest.routing());
// Bulk API only supports newline delimited JSON or Smile. Before executing
// the bulk, we need to check that all requests have the same content-type
// and this content-type is supported by the Bulk API.
XContentType bulkContentType = null;
for (int i = 0; i < bulkRequest.numberOfActions(); i++) {
DocWriteRequest<?> action = bulkRequest.requests().get(i);

DocWriteRequest.OpType opType = action.opType();
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
bulkContentType = enforceSameContentType((IndexRequest) action, bulkContentType);

} else if (opType == DocWriteRequest.OpType.UPDATE) {
UpdateRequest updateRequest = (UpdateRequest) action;
if (updateRequest.doc() != null) {
bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType);
}
if (updateRequest.upsertRequest() != null) {
bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType);
}
}
}

if (bulkContentType == null) {
bulkContentType = XContentType.JSON;
}

final byte separator = bulkContentType.xContent().streamSeparator();
final ContentType requestContentType = createContentType(bulkContentType);

ByteArrayOutputStream content = new ByteArrayOutputStream();
for (DocWriteRequest<?> action : bulkRequest.requests()) {
DocWriteRequest.OpType opType = action.opType();

try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) {
metadata.startObject();
{
metadata.startObject(opType.getLowercase());
if (Strings.hasLength(action.index())) {
metadata.field("_index", action.index());
}
if (Strings.hasLength(action.type())) {
metadata.field("_type", action.type());
}
if (Strings.hasLength(action.id())) {
metadata.field("_id", action.id());
}
if (Strings.hasLength(action.routing())) {
metadata.field("routing", action.routing());
}
if (Strings.hasLength(action.parent())) {
metadata.field("parent", action.parent());
}
if (action.version() != Versions.MATCH_ANY) {
metadata.field("version", action.version());
}

VersionType versionType = action.versionType();
if (versionType != VersionType.INTERNAL) {
if (versionType == VersionType.EXTERNAL) {
metadata.field("version_type", "external");
} else if (versionType == VersionType.EXTERNAL_GTE) {
metadata.field("version_type", "external_gte");
} else if (versionType == VersionType.FORCE) {
metadata.field("version_type", "force");
}
}

if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
metadata.field("if_seq_no", action.ifSeqNo());
metadata.field("if_primary_term", action.ifPrimaryTerm());
}

if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
IndexRequest indexRequest = (IndexRequest) action;
if (Strings.hasLength(indexRequest.getPipeline())) {
metadata.field("pipeline", indexRequest.getPipeline());
}
} else if (opType == DocWriteRequest.OpType.UPDATE) {
UpdateRequest updateRequest = (UpdateRequest) action;
if (updateRequest.retryOnConflict() > 0) {
metadata.field("retry_on_conflict", updateRequest.retryOnConflict());
}
if (updateRequest.fetchSource() != null) {
metadata.field("_source", updateRequest.fetchSource());
}
}
metadata.endObject();
}
metadata.endObject();

BytesRef metadataSource = BytesReference.bytes(metadata).toBytesRef();
content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length);
content.write(separator);
}

BytesRef source = null;
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
IndexRequest indexRequest = (IndexRequest) action;
BytesReference indexSource = indexRequest.source();
XContentType indexXContentType = indexRequest.getContentType();

try (XContentParser parser = XContentHelper.createParser(
/*
* EMPTY and THROW are fine here because we just call
* copyCurrentStructure which doesn't touch the
* registry or deprecation.
*/
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
indexSource, indexXContentType)) {
try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) {
builder.copyCurrentStructure(parser);
source = BytesReference.bytes(builder).toBytesRef();
}
}
} else if (opType == DocWriteRequest.OpType.UPDATE) {
source = XContentHelper.toXContent((UpdateRequest) action, bulkContentType, false).toBytesRef();
}

if (source != null) {
content.write(source.bytes, source.offset, source.length);
content.write(separator);
}
}

byte[] original = content.toByteArray();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(original.length);
GZIPOutputStream gzip = new GZIPOutputStream(byteArrayOutputStream);
gzip.write(original);
gzip.finish();
ByteArrayEntity entity = new ByteArrayEntity(byteArrayOutputStream.toByteArray(), requestContentType);
entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_ENCODING, GZIP_CODEC));
request.setEntity(entity);
return request;
}
}
Loading