Skip to content

Commit

Permalink
Merge pull request #3 from threatgrid/cleanup
Browse files Browse the repository at this point in the history
Add option to specify how to construct record's key from es document
  • Loading branch information
DeLaGuardo authored Oct 18, 2023
2 parents 62fb7c7 + c634793 commit 5813b3a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 66 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,17 @@ JSON-encoded string to be used as a `"query"` field in search requests.
`sort`
JSON-encoded string to be used as a `"sort"` field in search requests. It is important to add enough fields into the sorting criteria to allow search_after scroll for new documents.

* Type: string
* Default: null
* Importance: high

`key.field`
(Optional) Field name to extract string value from the document to be used as a record key.

* Type: string
* Default: null
* Importance: low

### Connector Configuration

`poll.interval.ms`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,17 @@ public ElasticsearchSourceConnectorConfig(final Map<?, ?> originalProps) {
private static final String BATCH_MAX_ROWS_DISPLAY = "Max Documents Per Batch";

public static final String QUERY_CONF = "query";
private static final String QUERY_DOC = "JSON encoded query to filter documents. Must be in the form of '{\"query\": ...}'";
private static final String QUERY_DOC = "JSON encoded query to filter documents. Must be in the form of '{\"query\": query}'";
private static final String QUERY_DISPLAY = "Query";

public static final String SORT_CONF = "sort";
private static final String SORT_DOC = "JSON encoded sorting criteria. Must be in the form of '{\"sort\": [...]}'. Sorting criteria will be used to enable smooth pagination via search_after.";
private static final String SORT_DOC = "JSON encoded sorting criteria. Must be in the form of '{\"sort\": sort}'. Sorting criteria will be used to enable smooth pagination via search_after.";
private static final String SORT_DISPLAY = "Sort";

public static final String KEY_FIELD_CONF = "key.field";
private static final String KEY_FIELD_DOC = "Field name common for all documents sutable to be used as a record key.";
private static final String KEY_FIELD_DISPLAY = "Key field";

public static final String INDEX_NAME_CONF = "index";
private static final String INDEX_NAME_DOC = "Elasticsearch index name to fetch data from.";
private static final String INDEX_NAME_DEFAULT = null;
Expand Down Expand Up @@ -247,7 +251,16 @@ private static void addQueryOptions(ConfigDef config) {
REQUEST_GROUP,
++orderInGroup,
Width.MEDIUM,
SORT_DISPLAY);
SORT_DISPLAY)
.define(KEY_FIELD_CONF,
Type.STRING,
null,
Importance.LOW,
KEY_FIELD_DOC,
REQUEST_GROUP,
++orderInGroup,
Width.MEDIUM,
KEY_FIELD_DISPLAY);
}

private static void addConnectorOptions(ConfigDef config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ElasticsearchSourceConnectorTask extends SourceTask {
private String searchAfterJson;
private String queryJson;
private String sortJson;
private String keyField;
private String index;
private int pollingMs;
private int batchMaxRows;
Expand All @@ -55,6 +56,7 @@ public void start(Map<String, String> props) {
index = config.getString(ElasticsearchSourceConnectorConfig.INDEX_NAME_CONF);
searchAfterJson = getLastOffset(index);
sortJson = config.getString(ElasticsearchSourceConnectorConfig.SORT_CONF);
keyField = config.getString(ElasticsearchSourceConnectorConfig.KEY_FIELD_CONF);
queryJson = config.getString(ElasticsearchSourceConnectorConfig.QUERY_CONF);
pollingMs = config.getInt(ElasticsearchSourceConnectorConfig.POLL_INTERVAL_MS_CONF);
batchMaxRows = config.getInt(ElasticsearchSourceConnectorConfig.BATCH_MAX_ROWS_CONF);
Expand Down Expand Up @@ -135,10 +137,10 @@ public List<SourceRecord> poll() throws InterruptedException {

Map<String, String> sourcePartition = Collections.singletonMap("index", index);
Map<String, String> sourceOffset = Collections.singletonMap("search_after", searchAfterJson);
String key = elasticDocument.get("id").textValue();
String key = keyField != null ? elasticDocument.get(keyField).textValue() : null;
String value = mapper.writeValueAsString(elasticDocument);

SourceRecord sourceRecord = new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value);
SourceRecord sourceRecord = new SourceRecord(sourcePartition, sourceOffset, topic, key != null ? Schema.STRING_SCHEMA : null, key, Schema.STRING_SCHEMA, value);
results.add(sourceRecord);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"key.converter" "org.apache.kafka.connect.storage.StringConverter"
"query" "{\"match_all\": {}}"
"sort" "[{\"@timestamp\": {\"order\": \"asc\"}}, \"id\"]"
"key.field" "id"
"topic" topic
"value.converter" "org.apache.kafka.connect.storage.StringConverter"}})
:content-type :json}))
Expand Down

0 comments on commit 5813b3a

Please sign in to comment.