Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic Search Source #95

Merged
merged 40 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ec62226
boilerplate
parikshitg Oct 1, 2024
1be548d
no more needed
parikshitg Oct 1, 2024
5b88778
fix: config
parikshitg Oct 2, 2024
13eb083
fix: updated config
parikshitg Oct 7, 2024
ce390a2
feat: search api
parikshitg Oct 7, 2024
7de7f7b
feat: position
parikshitg Oct 8, 2024
e978736
feat: open source
parikshitg Oct 8, 2024
fdf6846
feat: updated client search, timeout handled
parikshitg Oct 8, 2024
5bf7410
feat: worker definition
parikshitg Oct 8, 2024
b2bb72c
feat: source read
parikshitg Oct 8, 2024
adc8acf
feat: unmarshal position
parikshitg Oct 8, 2024
bfe0ca3
feat: source ack
parikshitg Oct 8, 2024
86b6070
feat: source teardown
parikshitg Oct 8, 2024
1136568
feat: graceful shutdown of workers
parikshitg Oct 8, 2024
61bc43c
feat: polling
parikshitg Oct 8, 2024
2099181
fix: es client search definition
parikshitg Oct 9, 2024
f0b894b
fix: linters
parikshitg Oct 9, 2024
c1c0522
fix: cleanup
parikshitg Oct 9, 2024
a9be98f
fix: add missing tag in config
parikshitg Oct 9, 2024
d36199e
fix: small fix in record metadata
parikshitg Oct 10, 2024
5a91d3b
feat: updated position
parikshitg Oct 10, 2024
862243e
feat: cleanup
parikshitg Oct 10, 2024
ed485c4
feat: added source in readme
parikshitg Oct 11, 2024
e71b907
fix: updated modules
parikshitg Oct 11, 2024
3bd6502
feat: posititon update and test
parikshitg Oct 11, 2024
f19eb7b
fix: linters and cleanup
parikshitg Oct 11, 2024
1b0fc75
Merge branch 'main' of github.com:conduitio-labs/conduit-connector-el…
parikshitg Oct 11, 2024
ed87a32
fix: some pr comments
parikshitg Oct 17, 2024
4a5502a
feat: search after
parikshitg Oct 17, 2024
73a2000
Merge branch 'main' of github.com:conduitio-labs/conduit-connector-el…
parikshitg Oct 17, 2024
633523f
fix: config used map[string]type
parikshitg Oct 18, 2024
0d4d39f
fix: pr comment esapi modules updated
parikshitg Oct 18, 2024
0bc3124
fix: go.sum
parikshitg Oct 18, 2024
5425b23
fix: sort by seq_no
parikshitg Oct 24, 2024
4812fd7
Merge branch 'main' of github.com:conduitio-labs/conduit-connector-el…
parikshitg Oct 24, 2024
8a9a426
fix: handled zeroth record scenario
parikshitg Oct 25, 2024
961b6f9
fix: used context for shutdown
parikshitg Oct 25, 2024
09827c2
fix: removed source from worker
parikshitg Oct 25, 2024
def6c6e
fix: pr comments
parikshitg Nov 4, 2024
7de418b
Merge branch 'main' of github.com:conduitio-labs/conduit-connector-el…
parikshitg Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ For any other action a warning entry is added to log and Record is skipped.
| `bulkSize` | The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. Note that values greater than `1000` may require additional service configuration. | `true` | `"1000"` |
| `retries` | The maximum number of retries of failed operations. The minimum value is `0` which disabled retry logic. The maximum value is `255`. Note that the higher value, the longer it may take to process retries, as a result, ingest next operations. | `true` | `"1000"` |


# Source
ElasticSearch source connector allows you to move data from multiple Elasticsearch indexes with the specified `host` and `indexes`. It uses elasticsearch search api to pull data from indexes. Upon starting it pulls batches of data from indexes, once all the data is retrieved, it then polls the search api to pull data at regular intervals.

## Configuration Options
| name | description | required | default |
|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|----------|
| `version` | The version of the Elasticsearch service. Currently supports version `8`. | `true` | |
| `host` | The Elasticsearch host and port (e.g.: http://127.0.0.1:9200). | `true` | |
| `username` | [v: 5, 6, 7, 8] The username for HTTP Basic Authentication. | `false` | |
| `password` | [v: 5, 6, 7, 8] The password for HTTP Basic Authentication. | `true` when username was provided, `false` otherwise | |
| `cloudId` | [v: 6, 7, 8] Endpoint for the Elastic Service (https://elastic.co/cloud). | `false` | |
| `apiKey` | [v: 6, 7, 8] Base64-encoded token for authorization; if set, overrides username/password and service token. | `false` | |
| `serviceToken` | [v: 7, 8] Service token for authorization; if set, overrides username/password. | `false` | |
| `certificateFingerprint` | [v: 7, 8] SHA256 hex fingerprint given by Elasticsearch on first launch. | `false` | |
| `indexes` | The name of the indexes to read the data from. | `true` | |
hariso marked this conversation as resolved.
Show resolved Hide resolved
| `batchSize` | The number of items to fetch from an index. The minimum value is `1`, maximum value is `10000`. | `false` | `"1000"` |
| `pollingPeriod` | The duration for polling the search api for fetching new records. | `false` | `"5s"` |


# Testing

Run `make test` to run all the unit and integration tests, which require Docker to be installed and running. The command will handle starting and stopping docker containers for you.
Expand All @@ -57,5 +77,6 @@ docker-compose -f test/docker-compose.v8.overrides.yml -f test/docker-compose.v8

- https://github.com/elastic/go-elasticsearch
- https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-your-data.html

![scarf pixel](https://static.scarf.sh/a.png?x-pxid=715ebf4a-148c-44ad-8f64-6cc5780d34ae)
3 changes: 2 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package elasticsearch

import (
"github.com/conduitio-labs/conduit-connector-elasticsearch/destination"
"github.com/conduitio-labs/conduit-connector-elasticsearch/source"
sdk "github.com/conduitio/conduit-connector-sdk"
)

var Connector = sdk.Connector{
NewSpecification: Specification,
NewSource: nil,
NewSource: source.NewSource,
NewDestination: destination.NewDestination,
}
51 changes: 51 additions & 0 deletions destination/client_moq_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.15.0
github.com/golangci/golangci-lint v1.61.0
github.com/jaswdr/faker v1.19.1
github.com/matryer/is v1.4.1
github.com/matryer/moq v0.5.0
github.com/stretchr/testify v1.9.0
go.uber.org/goleak v1.3.0
Expand Down Expand Up @@ -128,7 +129,6 @@ require (
github.com/maratori/testableexamples v1.0.0 // indirect
github.com/maratori/testpackage v1.1.1 // indirect
github.com/matoous/godox v0.0.0-20240105082147-c5b5e0e7c0c0 // indirect
github.com/matryer/is v1.4.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
Expand Down
36 changes: 36 additions & 0 deletions internal/elasticsearch/api/search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright © 2024 Meroxa, Inc. and Miquido
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

// SearchRequest is the request for calling ElasticSearch api.
type SearchRequest struct {
Index string `json:"index"`
Size *int `json:"size:"`
SearchAfter []int64 `json:"searchAfter"`
SortBy string `json:"sortBy"`
Order string `json:"order"`
}

// SearchResponse is the JSON response from Elasticsearch search query.
type SearchResponse struct {
Hits struct {
Hits []struct {
Index string `json:"_index"`
ID string `json:"_id"`
Source map[string]any `json:"_source"`
Sort []int64 `json:"sort"` // used for search_after
} `json:"hits"`
} `json:"hits"`
}
4 changes: 4 additions & 0 deletions internal/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"io"

"github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api"
"github.com/conduitio/conduit-commons/opencdc"
)

Expand All @@ -39,4 +40,7 @@ type Client interface {

// PrepareDeleteOperation prepares delete operation definition for Bulk API query.
PrepareDeleteOperation(key string, index string) (metadata interface{}, err error)

// Search calls the elasticsearch search api and retuns SearchResponse read from an index.
Search(ctx context.Context, request *api.SearchRequest) (*api.SearchResponse, error)
}
8 changes: 8 additions & 0 deletions internal/elasticsearch/v5/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"

"github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api"
"github.com/conduitio/conduit-commons/opencdc"

"github.com/elastic/go-elasticsearch/v5"
)

Expand Down Expand Up @@ -159,3 +161,9 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) {
return itemPayload.Bytes(), nil
}
}

// Search calls the elasticsearch search api and retuns SearchResponse read from an index.
func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) {
// TODO: implement elasticsearch search api
return nil, fmt.Errorf("method not implemented")
}
8 changes: 8 additions & 0 deletions internal/elasticsearch/v6/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"

"github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api"
"github.com/conduitio/conduit-commons/opencdc"

"github.com/elastic/go-elasticsearch/v6"
)

Expand Down Expand Up @@ -162,3 +164,9 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) {
return itemPayload.Bytes(), nil
}
}

// Search calls the elasticsearch search api and retuns SearchResponse read from an index.
func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) {
// TODO: implement elasticsearch search api
return nil, fmt.Errorf("method not implemented")
}
8 changes: 8 additions & 0 deletions internal/elasticsearch/v7/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"

"github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api"
"github.com/conduitio/conduit-commons/opencdc"

"github.com/elastic/go-elasticsearch/v7"
)

Expand Down Expand Up @@ -161,3 +163,9 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) {
return itemPayload.Bytes(), nil
}
}

// Search calls the elasticsearch search api and retuns SearchResponse read from an index.
func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) {
// TODO: implement elasticsearch search api
return nil, fmt.Errorf("method not implemented")
}
82 changes: 82 additions & 0 deletions internal/elasticsearch/v8/search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright © 2022 Meroxa, Inc. and Miquido
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v8

import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"

"github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api"

"github.com/elastic/go-elasticsearch/v8/esapi"
)

// Search calls the elasticsearch search api and retuns SearchResponse read from an index.
func (c *Client) Search(ctx context.Context, request *api.SearchRequest) (*api.SearchResponse, error) {
// Create the search request
req := esapi.SearchRequest{
Index: []string{request.Index},
Body: strings.NewReader(createSearchBody(request.SearchAfter)),
Size: request.Size,
}

// Perform the request
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
res, err := req.Do(ctx, c.es)
if err != nil {
return nil, fmt.Errorf("error getting search response: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return nil, fmt.Errorf("error search response: %s", res.String())
}

var response *api.SearchResponse
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("error parsing the search response body: %w", err)
}

return response, nil
}

func createSearchBody(searchAfter []int64) string {
body := map[string]interface{}{
"query": map[string]interface{}{
"match_all": struct{}{},
},
"sort": []map[string]interface{}{
{"_seq_no": map[string]string{
"order": "asc",
}},
},
}

if len(searchAfter) == 1 {
body["search_after"] = searchAfter
}

jsonBody, err := json.Marshal(body)
if err != nil {
log.Printf("error marshaling the search request body: %s", err)
}

return string(jsonBody)
}
Empty file removed source/.gitkeep
Empty file.
Loading
Loading