From ec622261d9b18c8afdaad62b4c4b9cb1ff660478 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 1 Oct 2024 09:24:29 +0530 Subject: [PATCH 01/36] boilerplate --- source/config.go | 84 ++++++++++++++++++++++++++++++++++++ source/paramgen.go | 104 +++++++++++++++++++++++++++++++++++++++++++++ source/source.go | 92 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 280 insertions(+) create mode 100644 source/config.go create mode 100644 source/paramgen.go create mode 100644 source/source.go diff --git a/source/config.go b/source/config.go new file mode 100644 index 0000000..13cd77f --- /dev/null +++ b/source/config.go @@ -0,0 +1,84 @@ +// Copyright © 2022 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate paramgen -output=paramgen.go Config + +package source + +import ( + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch" +) + +type Config struct { + // The version of the Elasticsearch service. One of: 5, 6, 7, 8. + Version elasticsearch.Version `json:"version" validate:"required"` + // The Elasticsearch host and port (e.g.: http://127.0.0.1:9200). + Host string `json:"host" validate:"required"` + // The username for HTTP Basic Authentication. + Username string `json:"username"` + // The password for HTTP Basic Authentication. + Password string `json:"password"` + // Endpoint for the Elastic Service (https://elastic.co/cloud). + CloudID string `json:"cloudID"` + // Base64-encoded token for authorization; if set, overrides username/password and service token. + APIKey string `json:"APIKey"` + // Service token for authorization; if set, overrides username/password. + ServiceToken string `json:"serviceToken"` + // SHA256 hex fingerprint given by Elasticsearch on first launch. + CertificateFingerprint string `json:"certificateFingerprint"` + // The name of the index to write the data to. + Index string `json:"index"` + // The name of the index's type to write the data to. + Type string `json:"type"` + // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`. + BulkSize uint64 `json:"bulkSize" default:"1000"` + // The maximum number of retries of failed operations. The minimum value is `0` which disabled retry logic. The maximum value is `255. + Retries uint8 `json:"retries" default:"0"` +} + +func (c Config) GetHost() string { + return c.Host +} + +func (c Config) GetUsername() string { + return c.Username +} + +func (c Config) GetPassword() string { + return c.Password +} + +func (c Config) GetCloudID() string { + return c.CloudID +} + +func (c Config) GetAPIKey() string { + return c.APIKey +} + +func (c Config) GetServiceToken() string { + return c.ServiceToken +} + +func (c Config) GetCertificateFingerprint() string { + return c.CertificateFingerprint +} + +func (c Config) GetIndex() string { + return c.Index +} + +func (c Config) GetType() string { + return c.Type +} diff --git a/source/paramgen.go b/source/paramgen.go new file mode 100644 index 0000000..1476354 --- /dev/null +++ b/source/paramgen.go @@ -0,0 +1,104 @@ +// Code generated by paramgen. DO NOT EDIT. +// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen + +package source + +import ( + "github.com/conduitio/conduit-commons/config" +) + +const ( + ConfigAPIKey = "APIKey" + ConfigBulkSize = "bulkSize" + ConfigCertificateFingerprint = "certificateFingerprint" + ConfigCloudID = "cloudID" + ConfigHost = "host" + ConfigIndex = "index" + ConfigPassword = "password" + ConfigRetries = "retries" + ConfigServiceToken = "serviceToken" + ConfigType = "type" + ConfigUsername = "username" + ConfigVersion = "version" +) + +func (Config) Parameters() map[string]config.Parameter { + return map[string]config.Parameter{ + ConfigAPIKey: { + Default: "", + Description: "Base64-encoded token for authorization; if set, overrides username/password and service token.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigBulkSize: { + Default: "1000", + Description: "The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`.", + Type: config.ParameterTypeInt, + Validations: []config.Validation{}, + }, + ConfigCertificateFingerprint: { + Default: "", + Description: "SHA256 hex fingerprint given by Elasticsearch on first launch.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigCloudID: { + Default: "", + Description: "Endpoint for the Elastic Service (https://elastic.co/cloud).", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigHost: { + Default: "", + Description: "The Elasticsearch host and port (e.g.: http://127.0.0.1:9200).", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + ConfigIndex: { + Default: "", + Description: "The name of the index to write the data to.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigPassword: { + Default: "", + Description: "The password for HTTP Basic Authentication.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigRetries: { + Default: "0", + Description: "The maximum number of retries of failed operations. The minimum value is `0` which disabled retry logic. The maximum value is `255.", + Type: config.ParameterTypeInt, + Validations: []config.Validation{}, + }, + ConfigServiceToken: { + Default: "", + Description: "Service token for authorization; if set, overrides username/password.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigType: { + Default: "", + Description: "The name of the index's type to write the data to.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigUsername: { + Default: "", + Description: "The username for HTTP Basic Authentication.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigVersion: { + Default: "", + Description: "The version of the Elasticsearch service. One of: 5, 6, 7, 8.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + } +} diff --git a/source/source.go b/source/source.go new file mode 100644 index 0000000..6827dde --- /dev/null +++ b/source/source.go @@ -0,0 +1,92 @@ +// Copyright © 2022 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "fmt" + + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch" + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" +) + +type Source struct { + sdk.UnimplementedSource + + config Config + client client + iterator Iterator +} + +type client = elasticsearch.Client + +// NewSource initialises a new source. +func NewSource() sdk.Source { + return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) +} + +// Parameters returns a map of named Parameters that describe how to configure the Source. +func (s *Source) Parameters() config.Parameters { + return s.config.Parameters() +} + +// Configure parses and stores configurations, +// returns an error in case of invalid configuration. +func (s *Source) Configure(ctx context.Context, cfgRaw config.Config) error { + sdk.Logger(ctx).Info().Msg("Configuring ElasticSearch Source...") + + err := sdk.Util.ParseConfig(ctx, cfgRaw, &s.config, NewSource().Parameters()) + if err != nil { + return err + } + + return nil +} + +// Open parses the position and initializes the iterator. +func (s *Source) Open(ctx context.Context, position opencdc.Position) error { + sdk.Logger(ctx).Info().Msg("Opening an ElasticSearch Source...") + return nil +} + +// Read returns the next record. +func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { + sdk.Logger(ctx).Debug().Msg("Reading a record from ElasticSearch Source...") + return opencdc.Record{}, nil +} + +// Ack logs the debug event with the position. +func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { + sdk.Logger(ctx).Trace(). + Str("position", string(position)). + Msg("got ack") + + return nil +} + +// Teardown gracefully shutdown connector. +func (s *Source) Teardown(ctx context.Context) error { + sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source") + + if s.iterator != nil { + if err := s.iterator.Stop(); err != nil { + return fmt.Errorf("stop iterator: %w", err) + } + } + + return nil +} From 1be548ddc87e658266d49b107ef0ca52c45a22b9 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 1 Oct 2024 12:00:23 +0530 Subject: [PATCH 02/36] no more needed --- source/.gitkeep | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 source/.gitkeep diff --git a/source/.gitkeep b/source/.gitkeep deleted file mode 100644 index e69de29..0000000 From 5b887786c06835167d374510c4fb0fef28edf1cd Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Wed, 2 Oct 2024 15:37:08 +0530 Subject: [PATCH 03/36] fix: config --- source/config.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/source/config.go b/source/config.go index 13cd77f..542334d 100644 --- a/source/config.go +++ b/source/config.go @@ -37,14 +37,12 @@ type Config struct { ServiceToken string `json:"serviceToken"` // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` - // The name of the index to write the data to. - Index string `json:"index"` - // The name of the index's type to write the data to. - Type string `json:"type"` + // The name of the indexes to read data from. + Index []string `json:"index"` + // // The name of the index's type to write the data to. + // Type string `json:"type"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`. - BulkSize uint64 `json:"bulkSize" default:"1000"` - // The maximum number of retries of failed operations. The minimum value is `0` which disabled retry logic. The maximum value is `255. - Retries uint8 `json:"retries" default:"0"` + BatchSize uint64 `json:"batchSize" default:"1"` } func (c Config) GetHost() string { @@ -75,9 +73,9 @@ func (c Config) GetCertificateFingerprint() string { return c.CertificateFingerprint } -func (c Config) GetIndex() string { - return c.Index -} +// func (c Config) GetIndex() string { +// return c.Index +// } func (c Config) GetType() string { return c.Type From 13eb083b2a3c884e1a6480f48370e0681f0c1fdc Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Mon, 7 Oct 2024 09:37:25 +0530 Subject: [PATCH 04/36] fix: updated config --- source/config.go | 42 ++---------------------------------------- source/paramgen.go | 20 +++----------------- 2 files changed, 5 insertions(+), 57 deletions(-) diff --git a/source/config.go b/source/config.go index 542334d..a2af5ca 100644 --- a/source/config.go +++ b/source/config.go @@ -38,45 +38,7 @@ type Config struct { // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` // The name of the indexes to read data from. - Index []string `json:"index"` - // // The name of the index's type to write the data to. - // Type string `json:"type"` + Indexes []string `json:"index"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`. - BatchSize uint64 `json:"batchSize" default:"1"` -} - -func (c Config) GetHost() string { - return c.Host -} - -func (c Config) GetUsername() string { - return c.Username -} - -func (c Config) GetPassword() string { - return c.Password -} - -func (c Config) GetCloudID() string { - return c.CloudID -} - -func (c Config) GetAPIKey() string { - return c.APIKey -} - -func (c Config) GetServiceToken() string { - return c.ServiceToken -} - -func (c Config) GetCertificateFingerprint() string { - return c.CertificateFingerprint -} - -// func (c Config) GetIndex() string { -// return c.Index -// } - -func (c Config) GetType() string { - return c.Type + BatchSize uint64 `json:"batchSize" default:"1000"` } diff --git a/source/paramgen.go b/source/paramgen.go index 1476354..e60772d 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -9,15 +9,13 @@ import ( const ( ConfigAPIKey = "APIKey" - ConfigBulkSize = "bulkSize" + ConfigBatchSize = "batchSize" ConfigCertificateFingerprint = "certificateFingerprint" ConfigCloudID = "cloudID" ConfigHost = "host" ConfigIndex = "index" ConfigPassword = "password" - ConfigRetries = "retries" ConfigServiceToken = "serviceToken" - ConfigType = "type" ConfigUsername = "username" ConfigVersion = "version" ) @@ -30,7 +28,7 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeString, Validations: []config.Validation{}, }, - ConfigBulkSize: { + ConfigBatchSize: { Default: "1000", Description: "The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`.", Type: config.ParameterTypeInt, @@ -58,7 +56,7 @@ func (Config) Parameters() map[string]config.Parameter { }, ConfigIndex: { Default: "", - Description: "The name of the index to write the data to.", + Description: "The name of the indexes to read data from.", Type: config.ParameterTypeString, Validations: []config.Validation{}, }, @@ -68,24 +66,12 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeString, Validations: []config.Validation{}, }, - ConfigRetries: { - Default: "0", - Description: "The maximum number of retries of failed operations. The minimum value is `0` which disabled retry logic. The maximum value is `255.", - Type: config.ParameterTypeInt, - Validations: []config.Validation{}, - }, ConfigServiceToken: { Default: "", Description: "Service token for authorization; if set, overrides username/password.", Type: config.ParameterTypeString, Validations: []config.Validation{}, }, - ConfigType: { - Default: "", - Description: "The name of the index's type to write the data to.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, ConfigUsername: { Default: "", Description: "The username for HTTP Basic Authentication.", From ce390a218f6527e56c5b6b62cccd8ef719abc758 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Mon, 7 Oct 2024 10:46:19 +0530 Subject: [PATCH 05/36] feat: search api --- internal/elasticsearch/client.go | 3 ++ internal/elasticsearch/v5/client.go | 6 +++ internal/elasticsearch/v6/client.go | 6 +++ internal/elasticsearch/v7/client.go | 6 +++ internal/elasticsearch/v8/search.go | 77 +++++++++++++++++++++++++++++ 5 files changed, 98 insertions(+) create mode 100644 internal/elasticsearch/v8/search.go diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index 77c739a..217aedd 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -39,4 +39,7 @@ type Client interface { // PrepareDeleteOperation prepares delete operation definition for Bulk API query. PrepareDeleteOperation(key string) (metadata interface{}, err error) + + // Search calls the elasticsearch search api and retuns a list of records read from an index. + Search(ctx context.Context, index string, offset, size *int) (interface{}, error) } diff --git a/internal/elasticsearch/v5/client.go b/internal/elasticsearch/v5/client.go index 03a4cdd..2f2e14a 100644 --- a/internal/elasticsearch/v5/client.go +++ b/internal/elasticsearch/v5/client.go @@ -159,3 +159,9 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { return itemPayload.Bytes(), nil } } + +// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { + // TODO: implement elasticsearch search api + return nil, nil +} diff --git a/internal/elasticsearch/v6/client.go b/internal/elasticsearch/v6/client.go index 287aa62..d94c7d0 100644 --- a/internal/elasticsearch/v6/client.go +++ b/internal/elasticsearch/v6/client.go @@ -162,3 +162,9 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { return itemPayload.Bytes(), nil } } + +// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { + // TODO: implement elasticsearch search api + return nil, nil +} diff --git a/internal/elasticsearch/v7/client.go b/internal/elasticsearch/v7/client.go index 9106859..f63fb2c 100644 --- a/internal/elasticsearch/v7/client.go +++ b/internal/elasticsearch/v7/client.go @@ -161,3 +161,9 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { return itemPayload.Bytes(), nil } } + +// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { + // TODO: implement elasticsearch search api + return nil, nil +} diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go new file mode 100644 index 0000000..6e76068 --- /dev/null +++ b/internal/elasticsearch/v8/search.go @@ -0,0 +1,77 @@ +// Copyright © 2022 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v8 + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/go-elasticsearch/esapi" +) + +const ( + // metadataFieldIndex is a name of a record metadata field that stores a ElasticSearch Index name. + metadataFieldIndex = "elasticsearch.index" +) + +// SearchReponse is the JSON response from Elasticsearch search query. +type SearchReponse struct { + Hits struct { + Total struct { + Value int `json:"value"` + } `json:"total"` + Hits []struct { + Index string `json:"index"` + ID string `json:"_id"` + Score float64 `json:"_score"` + Source map[string]any `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} + +// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { + // Create the search request + req := esapi.SearchRequest{ + Index: []string{index}, + Body: strings.NewReader(fmt.Sprintf(`{ + "query": { + "match_all": {} + } + }`)), + From: offset, + Size: size, + } + + // Perform the request + res, err := req.Do(ctx, c.es) + if err != nil { + return nil, fmt.Errorf("error getting search response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("error search response: %s", res.String()) + } + + var response SearchReponse + if err := json.NewDecoder(res.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("error parsing the search response body: %s", err) + } + + return response, nil +} From 7de7f7b61dc89983eea310eb67adb09aa476a47b Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 09:54:17 +0530 Subject: [PATCH 06/36] feat: position --- source/position.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 source/position.go diff --git a/source/position.go b/source/position.go new file mode 100644 index 0000000..823575d --- /dev/null +++ b/source/position.go @@ -0,0 +1,54 @@ +// Copyright © 2022 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "encoding/json" + "fmt" + + "github.com/conduitio/conduit-commons/opencdc" +) + +// Position represents position of a document in an index. +type Position struct { + ID string `json:"id"` + Index string `json:"index"` + Pos int `json:"pos"` +} + +// ParseSDKPosition parses opencdc.Position and returns Position. +func ParseSDKPosition(position opencdc.Position) ([]Position, error) { + var pos []Position + + if position == nil { + return pos, nil + } + + if err := json.Unmarshal(position, &pos); err != nil { + return pos, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) + } + + return pos, nil +} + +// marshal marshals Position and returns opencdc.Position or an error. +func (p Position) marshal() (opencdc.Position, error) { + positionBytes, err := json.Marshal(p) + if err != nil { + return nil, fmt.Errorf("marshal position: %w", err) + } + + return positionBytes, nil +} From e9787362157c253a79e9f34d6f6918f537e5e57f Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 10:05:23 +0530 Subject: [PATCH 07/36] feat: open source --- source/source.go | 55 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/source/source.go b/source/source.go index 6827dde..6d45748 100644 --- a/source/source.go +++ b/source/source.go @@ -27,13 +27,13 @@ import ( type Source struct { sdk.UnimplementedSource - config Config - client client - iterator Iterator + config Config + client elasticsearch.Client + offsets map[string]int + positions []Position + ch chan opencdc.Record } -type client = elasticsearch.Client - // NewSource initialises a new source. func NewSource() sdk.Source { return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) @@ -60,6 +60,41 @@ func (s *Source) Configure(ctx context.Context, cfgRaw config.Config) error { // Open parses the position and initializes the iterator. func (s *Source) Open(ctx context.Context, position opencdc.Position) error { sdk.Logger(ctx).Info().Msg("Opening an ElasticSearch Source...") + + var err error + s.positions, err = ParseSDKPosition(position) + if err != nil { + return err + } + + // Initialize Elasticsearch client + s.client, err = elasticsearch.NewClient(s.config.Version, s.config) + if err != nil { + return fmt.Errorf("failed creating client: %w", err) + } + + // Check the connection + if err := s.client.Ping(ctx); err != nil { + return fmt.Errorf("server cannot be pinged: %w", err) + } + + s.ch = make(chan opencdc.Record, s.config.BatchSize) + s.offsets = make(map[string]int) + + for _, index := range s.config.Indexes { + offset := 0 + for _, position := range s.positions { + if index == position.Index { + offset = position.Pos + } + } + + s.offsets[index] = offset + + // a new worker for a new index + NewWorker(s, index, offset) + } + return nil } @@ -82,11 +117,11 @@ func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { func (s *Source) Teardown(ctx context.Context) error { sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source") - if s.iterator != nil { - if err := s.iterator.Stop(); err != nil { - return fmt.Errorf("stop iterator: %w", err) - } - } + // if s.iterator != nil { + // if err := s.iterator.Stop(); err != nil { + // return fmt.Errorf("stop iterator: %w", err) + // } + // } return nil } From fdf68465ae8c9d792bb9157c8cc66ae23fa27cba Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 10:21:21 +0530 Subject: [PATCH 08/36] feat: updated client search, timeout handled --- internal/elasticsearch/v8/search.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index 6e76068..256811f 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -19,17 +19,13 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/elastic/go-elasticsearch/esapi" ) -const ( - // metadataFieldIndex is a name of a record metadata field that stores a ElasticSearch Index name. - metadataFieldIndex = "elasticsearch.index" -) - -// SearchReponse is the JSON response from Elasticsearch search query. -type SearchReponse struct { +// SearchResponse is the JSON response from Elasticsearch search query. +type SearchResponse struct { Hits struct { Total struct { Value int `json:"value"` @@ -37,13 +33,12 @@ type SearchReponse struct { Hits []struct { Index string `json:"index"` ID string `json:"_id"` - Score float64 `json:"_score"` Source map[string]any `json:"_source"` } `json:"hits"` } `json:"hits"` } -// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. +// Search calls the elasticsearch search api and retuns SearchResponse read from an index. func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { // Create the search request req := esapi.SearchRequest{ @@ -58,6 +53,7 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (i } // Perform the request + ctx, _ = context.WithTimeout(ctx, 5*time.Second) res, err := req.Do(ctx, c.es) if err != nil { return nil, fmt.Errorf("error getting search response: %s", err) @@ -68,7 +64,7 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (i return nil, fmt.Errorf("error search response: %s", res.String()) } - var response SearchReponse + var response SearchResponse if err := json.NewDecoder(res.Body).Decode(&response); err != nil { return nil, fmt.Errorf("error parsing the search response body: %s", err) } From 5bf741068b04f35fc0f68b53d1feb7d331e31b8a Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 10:36:17 +0530 Subject: [PATCH 09/36] feat: worker definition --- source/worker.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 source/worker.go diff --git a/source/worker.go b/source/worker.go new file mode 100644 index 0000000..236e562 --- /dev/null +++ b/source/worker.go @@ -0,0 +1,81 @@ +package source + +import ( + "context" + "encoding/json" + "log" + "time" + + v8 "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/v8" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" +) + +const ( + // metadataFieldIndex is a name of a record metadata field that stores a ElasticSearch Index name. + metadataFieldIndex = "elasticsearch.index" +) + +type Worker struct { + source *Source + index string + offset int +} + +func NewWorker(source *Source, index string, offset int) { + worker := &Worker{ + source: source, + index: index, + offset: offset, + } + + go worker.start() +} + +func (w *Worker) start() { + for { + response, err := w.source.client.Search(context.Background(), w.index, &w.offset, &w.source.config.BatchSize) + if err != nil { + log.Println("search() err:", err) + time.Sleep(1 * time.Second) + continue + } + + res, ok := response.(v8.SearchResponse) + if !ok { + // TODO + // return nil, fmt.Errorf("invalid search response") + } + + for _, hit := range res.Hits.Hits { + metadata := opencdc.Metadata{ + metadataFieldIndex: hit.Index, + } + metadata.SetCreatedAt(time.Now().UTC()) + + payload, err := json.Marshal(hit.Source) + if err != nil { + // log + continue + } + + position := Position{ + ID: hit.ID, + Index: hit.Index, + Pos: w.offset + 1, + } + sdkPosition, err := position.marshal() + if err != nil { + // handle + } + + key := make(opencdc.StructuredData) + key["id"] = hit.ID + + record := sdk.Util.Source.NewRecordCreate(sdkPosition, metadata, key, opencdc.RawData(payload)) + + w.source.ch <- record + w.offset++ + } + } +} From b2bb72c8ad2568631d31b98d49f81991f9e83e36 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 11:11:56 +0530 Subject: [PATCH 10/36] feat: source read --- source/source.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/source/source.go b/source/source.go index 6d45748..fc72931 100644 --- a/source/source.go +++ b/source/source.go @@ -101,7 +101,30 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { // Read returns the next record. func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { sdk.Logger(ctx).Debug().Msg("Reading a record from ElasticSearch Source...") - return opencdc.Record{}, nil + + if s == nil || s.ch == nil { + return opencdc.Record{}, fmt.Errorf("error source not opened for reading") + } + + record, ok := <-s.ch + if !ok { + return opencdc.Record{}, fmt.Errorf("error reading data") + } + + index, ok := record.Metadata["metadataFieldIndex"] + if !ok { + // this should never happen + return opencdc.Record{}, fmt.Errorf("error index not found in data header") + } + + offset, ok := s.offsets[index] + if !ok { + // this should never happen + return opencdc.Record{}, fmt.Errorf("error offset index not found") + } + + s.offsets[index] = offset + 1 + return record, nil } // Ack logs the debug event with the position. From adc8acf07bda6016426a5add70004a14b9f418fa Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 11:24:45 +0530 Subject: [PATCH 11/36] feat: unmarshal position --- source/position.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/source/position.go b/source/position.go index 823575d..4f82f51 100644 --- a/source/position.go +++ b/source/position.go @@ -52,3 +52,12 @@ func (p Position) marshal() (opencdc.Position, error) { return positionBytes, nil } + +// unmarshal unmarshals opencdc.Position and retuns an error on failure. +func (p Position) unmarshal(position opencdc.Position) error { + err := json.Unmarshal(position, &p) + if err != nil { + return fmt.Errorf("unmarshal position: %w", err) + } + return nil +} From bfe0ca30b3a2bfe73ddf78f10f5de1dad37f4b99 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 11:39:29 +0530 Subject: [PATCH 12/36] feat: source ack --- source/source.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/source/source.go b/source/source.go index fc72931..4c51971 100644 --- a/source/source.go +++ b/source/source.go @@ -129,9 +129,23 @@ func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { // Ack logs the debug event with the position. func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { - sdk.Logger(ctx).Trace(). - Str("position", string(position)). - Msg("got ack") + pos := Position{} + err := pos.unmarshal(position) + if err != nil { + return fmt.Errorf("error unmarshaling opencdc position: %w", err) + } + + last := s.offsets[pos.Index] + + for _, p := range s.positions { + if p.Index == pos.Index && p.Pos > pos.Pos { + return fmt.Errorf("error acknowledging: position less than initial sdk position") + } + } + + if last < pos.Pos { + return fmt.Errorf("error acknowledging: record not read") + } return nil } From 86b607097a309c335581575ee3abd8529054e0dc Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 12:23:50 +0530 Subject: [PATCH 13/36] feat: source teardown --- source/source.go | 15 ++++++++++----- source/worker.go | 10 ++++++++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/source/source.go b/source/source.go index 4c51971..12d2f01 100644 --- a/source/source.go +++ b/source/source.go @@ -32,6 +32,7 @@ type Source struct { offsets map[string]int positions []Position ch chan opencdc.Record + shutdown chan struct{} } // NewSource initialises a new source. @@ -79,6 +80,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { } s.ch = make(chan opencdc.Record, s.config.BatchSize) + s.shutdown = make(chan struct{}) s.offsets = make(map[string]int) for _, index := range s.config.Indexes { @@ -154,11 +156,14 @@ func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { func (s *Source) Teardown(ctx context.Context) error { sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source") - // if s.iterator != nil { - // if err := s.iterator.Stop(); err != nil { - // return fmt.Errorf("stop iterator: %w", err) - // } - // } + if s == nil || s.ch == nil { + return fmt.Errorf("error source not opened for teardown") + } + + close(s.shutdown) + close(s.ch) + // reset read channel to nil, to avoid reading buffered records + s.ch = nil return nil } diff --git a/source/worker.go b/source/worker.go index 236e562..c8ffc6a 100644 --- a/source/worker.go +++ b/source/worker.go @@ -74,8 +74,14 @@ func (w *Worker) start() { record := sdk.Util.Source.NewRecordCreate(sdkPosition, metadata, key, opencdc.RawData(payload)) - w.source.ch <- record - w.offset++ + select { + case w.source.ch <- record: + w.offset++ + + case <-w.source.shutdown: + log.Println("Stopping worker...") + return + } } } } From 113656824d9a6d238ef8bf841d93994192b8d343 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 12:28:40 +0530 Subject: [PATCH 14/36] feat: graceful shutdown of workers --- source/source.go | 10 +++++++++- source/worker.go | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/source/source.go b/source/source.go index 12d2f01..7246058 100644 --- a/source/source.go +++ b/source/source.go @@ -17,6 +17,7 @@ package source import ( "context" "fmt" + "sync" "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch" "github.com/conduitio/conduit-commons/config" @@ -33,6 +34,7 @@ type Source struct { positions []Position ch chan opencdc.Record shutdown chan struct{} + wg *sync.WaitGroup } // NewSource initialises a new source. @@ -82,8 +84,11 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { s.ch = make(chan opencdc.Record, s.config.BatchSize) s.shutdown = make(chan struct{}) s.offsets = make(map[string]int) + s.wg = &sync.WaitGroup{} for _, index := range s.config.Indexes { + s.wg.Add(1) + offset := 0 for _, position := range s.positions { if index == position.Index { @@ -161,8 +166,11 @@ func (s *Source) Teardown(ctx context.Context) error { } close(s.shutdown) - close(s.ch) + // wait for goroutines to finish + s.wg.Wait() + // close the read channel for write + close(s.ch) // reset read channel to nil, to avoid reading buffered records s.ch = nil return nil diff --git a/source/worker.go b/source/worker.go index c8ffc6a..33ca180 100644 --- a/source/worker.go +++ b/source/worker.go @@ -33,6 +33,8 @@ func NewWorker(source *Source, index string, offset int) { } func (w *Worker) start() { + defer w.source.wg.Done() + for { response, err := w.source.client.Search(context.Background(), w.index, &w.offset, &w.source.config.BatchSize) if err != nil { From 61bc43c01b9b475d4503b8800c6750b2fbb5af76 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Tue, 8 Oct 2024 13:12:42 +0530 Subject: [PATCH 15/36] feat: polling --- source/config.go | 6 +++++- source/paramgen.go | 7 +++++++ source/worker.go | 25 ++++++++++++++++--------- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/source/config.go b/source/config.go index a2af5ca..0581eb0 100644 --- a/source/config.go +++ b/source/config.go @@ -17,6 +17,8 @@ package source import ( + "time" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch" ) @@ -40,5 +42,7 @@ type Config struct { // The name of the indexes to read data from. Indexes []string `json:"index"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`. - BatchSize uint64 `json:"batchSize" default:"1000"` + BatchSize int `json:"batchSize" default:"1000"` + // This period is used by workers to poll for new data at regular intervals. + PollingPeriod time.Duration } diff --git a/source/paramgen.go b/source/paramgen.go index e60772d..56bebe8 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -15,6 +15,7 @@ const ( ConfigHost = "host" ConfigIndex = "index" ConfigPassword = "password" + ConfigPollingPeriod = "pollingPeriod" ConfigServiceToken = "serviceToken" ConfigUsername = "username" ConfigVersion = "version" @@ -66,6 +67,12 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeString, Validations: []config.Validation{}, }, + ConfigPollingPeriod: { + Default: "", + Description: "This period is used by workers to poll for new data at regular intervals.", + Type: config.ParameterTypeDuration, + Validations: []config.Validation{}, + }, ConfigServiceToken: { Default: "", Description: "Service token for authorization; if set, overrides username/password.", diff --git a/source/worker.go b/source/worker.go index 33ca180..8b1ecd5 100644 --- a/source/worker.go +++ b/source/worker.go @@ -37,16 +37,23 @@ func (w *Worker) start() { for { response, err := w.source.client.Search(context.Background(), w.index, &w.offset, &w.source.config.BatchSize) - if err != nil { - log.Println("search() err:", err) - time.Sleep(1 * time.Second) - continue - } - res, ok := response.(v8.SearchResponse) - if !ok { - // TODO - // return nil, fmt.Errorf("invalid search response") + if err != nil || len(res.Hits.Hits) == 0 || !ok { + if err != nil { + log.Println("search() err:", err) + } + if !ok { + log.Println("invalid response") + } + + select { + case <-w.source.shutdown: + log.Println("shuting down..") + return + + case <-time.After(w.source.config.PollingPeriod): + continue + } } for _, hit := range res.Hits.Hits { From 2099181479b50659f5f93f996ccb46ba8037cff3 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Wed, 9 Oct 2024 12:27:40 +0530 Subject: [PATCH 16/36] fix: es client search definition --- internal/elasticsearch/api/search.go | 29 ++++++++++++++++++++++++++++ internal/elasticsearch/client.go | 5 +++-- internal/elasticsearch/v5/client.go | 6 ++++-- internal/elasticsearch/v6/client.go | 6 ++++-- internal/elasticsearch/v7/client.go | 6 ++++-- internal/elasticsearch/v8/search.go | 20 ++++--------------- source/worker.go | 16 ++++++--------- 7 files changed, 54 insertions(+), 34 deletions(-) create mode 100644 internal/elasticsearch/api/search.go diff --git a/internal/elasticsearch/api/search.go b/internal/elasticsearch/api/search.go new file mode 100644 index 0000000..e4ad27a --- /dev/null +++ b/internal/elasticsearch/api/search.go @@ -0,0 +1,29 @@ +// Copyright © 2024 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +// SearchResponse is the JSON response from Elasticsearch search query. +type SearchResponse struct { + Hits struct { + Total struct { + Value int `json:"value"` + } `json:"total"` + Hits []struct { + Index string `json:"index"` + ID string `json:"_id"` + Source map[string]any `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index 217aedd..e4cf0c2 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -18,6 +18,7 @@ import ( "context" "io" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" "github.com/conduitio/conduit-commons/opencdc" ) @@ -40,6 +41,6 @@ type Client interface { // PrepareDeleteOperation prepares delete operation definition for Bulk API query. PrepareDeleteOperation(key string) (metadata interface{}, err error) - // Search calls the elasticsearch search api and retuns a list of records read from an index. - Search(ctx context.Context, index string, offset, size *int) (interface{}, error) + // Search calls the elasticsearch search api and retuns SearchResponse read from an index. + Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) } diff --git a/internal/elasticsearch/v5/client.go b/internal/elasticsearch/v5/client.go index 2f2e14a..f70cc7c 100644 --- a/internal/elasticsearch/v5/client.go +++ b/internal/elasticsearch/v5/client.go @@ -21,7 +21,9 @@ import ( "fmt" "io" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" "github.com/conduitio/conduit-commons/opencdc" + "github.com/elastic/go-elasticsearch/v5" ) @@ -160,8 +162,8 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } } -// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { +// Search calls the elasticsearch search api and retuns SearchResponse read from an index. +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api return nil, nil } diff --git a/internal/elasticsearch/v6/client.go b/internal/elasticsearch/v6/client.go index d94c7d0..2ded404 100644 --- a/internal/elasticsearch/v6/client.go +++ b/internal/elasticsearch/v6/client.go @@ -21,7 +21,9 @@ import ( "fmt" "io" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" "github.com/conduitio/conduit-commons/opencdc" + "github.com/elastic/go-elasticsearch/v6" ) @@ -163,8 +165,8 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } } -// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { +// Search calls the elasticsearch search api and retuns SearchResponse read from an index. +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api return nil, nil } diff --git a/internal/elasticsearch/v7/client.go b/internal/elasticsearch/v7/client.go index f63fb2c..d93e2a5 100644 --- a/internal/elasticsearch/v7/client.go +++ b/internal/elasticsearch/v7/client.go @@ -21,7 +21,9 @@ import ( "fmt" "io" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" "github.com/conduitio/conduit-commons/opencdc" + "github.com/elastic/go-elasticsearch/v7" ) @@ -162,8 +164,8 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } } -// Search calls the elasticsearch search api and retuns a list of opencdc.Record read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { +// Search calls the elasticsearch search api and retuns SearchResponse read from an index. +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api return nil, nil } diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index 256811f..a8d7d9a 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -21,25 +21,13 @@ import ( "strings" "time" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" + "github.com/elastic/go-elasticsearch/esapi" ) -// SearchResponse is the JSON response from Elasticsearch search query. -type SearchResponse struct { - Hits struct { - Total struct { - Value int `json:"value"` - } `json:"total"` - Hits []struct { - Index string `json:"index"` - ID string `json:"_id"` - Source map[string]any `json:"_source"` - } `json:"hits"` - } `json:"hits"` -} - // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (interface{}, error) { +func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { // Create the search request req := esapi.SearchRequest{ Index: []string{index}, @@ -64,7 +52,7 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (i return nil, fmt.Errorf("error search response: %s", res.String()) } - var response SearchResponse + var response *api.SearchResponse if err := json.NewDecoder(res.Body).Decode(&response); err != nil { return nil, fmt.Errorf("error parsing the search response body: %s", err) } diff --git a/source/worker.go b/source/worker.go index 8b1ecd5..864b93d 100644 --- a/source/worker.go +++ b/source/worker.go @@ -6,14 +6,13 @@ import ( "log" "time" - v8 "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/v8" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" ) const ( // metadataFieldIndex is a name of a record metadata field that stores a ElasticSearch Index name. - metadataFieldIndex = "elasticsearch.index" + metadataFieldIndex = "opencdc.collection" ) type Worker struct { @@ -37,18 +36,14 @@ func (w *Worker) start() { for { response, err := w.source.client.Search(context.Background(), w.index, &w.offset, &w.source.config.BatchSize) - res, ok := response.(v8.SearchResponse) - if err != nil || len(res.Hits.Hits) == 0 || !ok { + if err != nil || len(response.Hits.Hits) == 0 { if err != nil { log.Println("search() err:", err) } - if !ok { - log.Println("invalid response") - } select { case <-w.source.shutdown: - log.Println("shuting down..") + log.Println("worker shutting down...") return case <-time.After(w.source.config.PollingPeriod): @@ -56,7 +51,7 @@ func (w *Worker) start() { } } - for _, hit := range res.Hits.Hits { + for _, hit := range response.Hits.Hits { metadata := opencdc.Metadata{ metadataFieldIndex: hit.Index, } @@ -76,6 +71,7 @@ func (w *Worker) start() { sdkPosition, err := position.marshal() if err != nil { // handle + continue } key := make(opencdc.StructuredData) @@ -88,7 +84,7 @@ func (w *Worker) start() { w.offset++ case <-w.source.shutdown: - log.Println("Stopping worker...") + log.Println("worker shutting down...") return } } From f0b894ba7206a0539dfaf25a90379acfac8ab3c6 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Wed, 9 Oct 2024 12:47:16 +0530 Subject: [PATCH 17/36] fix: linters --- README.md | 2 +- destination/client_moq_test.go | 63 +++++++++++++++++++++++++++++ internal/elasticsearch/v5/client.go | 4 +- internal/elasticsearch/v6/client.go | 4 +- internal/elasticsearch/v7/client.go | 4 +- internal/elasticsearch/v8/search.go | 8 ++-- source/config.go | 2 +- source/position.go | 2 +- source/source.go | 4 +- source/worker.go | 14 +++++++ 10 files changed, 92 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 2f2f921..88a22de 100644 --- a/README.md +++ b/README.md @@ -58,4 +58,4 @@ docker-compose -f test/docker-compose.v8.overrides.yml -f test/docker-compose.v8 - https://github.com/elastic/go-elasticsearch - https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html -![scarf pixel](https://static.scarf.sh/a.png?x-pxid=715ebf4a-148c-44ad-8f64-6cc5780d34ae) +![scarf pixel](https://static.scarf.sh/a.png?x-pxid=) diff --git a/destination/client_moq_test.go b/destination/client_moq_test.go index 41f9fe0..ac99814 100644 --- a/destination/client_moq_test.go +++ b/destination/client_moq_test.go @@ -5,6 +5,7 @@ package destination import ( "context" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" "github.com/conduitio/conduit-commons/opencdc" "io" "sync" @@ -35,6 +36,9 @@ var _ client = &clientMock{} // PrepareUpsertOperationFunc: func(key string, item opencdc.Record) (interface{}, interface{}, error) { // panic("mock out the PrepareUpsertOperation method") // }, +// SearchFunc: func(ctx context.Context, index string, offset *int, size *int) (*api.SearchResponse, error) { +// panic("mock out the Search method") +// }, // } // // // use mockedclient in code that requires client @@ -57,6 +61,9 @@ type clientMock struct { // PrepareUpsertOperationFunc mocks the PrepareUpsertOperation method. PrepareUpsertOperationFunc func(key string, item opencdc.Record) (interface{}, interface{}, error) + // SearchFunc mocks the Search method. + SearchFunc func(ctx context.Context, index string, offset *int, size *int) (*api.SearchResponse, error) + // calls tracks calls to the methods. calls struct { // Bulk holds details about calls to the Bulk method. @@ -88,12 +95,24 @@ type clientMock struct { // Item is the item argument value. Item opencdc.Record } + // Search holds details about calls to the Search method. + Search []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Index is the index argument value. + Index string + // Offset is the offset argument value. + Offset *int + // Size is the size argument value. + Size *int + } } lockBulk sync.RWMutex lockPing sync.RWMutex lockPrepareCreateOperation sync.RWMutex lockPrepareDeleteOperation sync.RWMutex lockPrepareUpsertOperation sync.RWMutex + lockSearch sync.RWMutex } // Bulk calls BulkFunc. @@ -263,3 +282,47 @@ func (mock *clientMock) PrepareUpsertOperationCalls() []struct { mock.lockPrepareUpsertOperation.RUnlock() return calls } + +// Search calls SearchFunc. +func (mock *clientMock) Search(ctx context.Context, index string, offset *int, size *int) (*api.SearchResponse, error) { + if mock.SearchFunc == nil { + panic("clientMock.SearchFunc: method is nil but client.Search was just called") + } + callInfo := struct { + Ctx context.Context + Index string + Offset *int + Size *int + }{ + Ctx: ctx, + Index: index, + Offset: offset, + Size: size, + } + mock.lockSearch.Lock() + mock.calls.Search = append(mock.calls.Search, callInfo) + mock.lockSearch.Unlock() + return mock.SearchFunc(ctx, index, offset, size) +} + +// SearchCalls gets all the calls that were made to Search. +// Check the length with: +// +// len(mockedclient.SearchCalls()) +func (mock *clientMock) SearchCalls() []struct { + Ctx context.Context + Index string + Offset *int + Size *int +} { + var calls []struct { + Ctx context.Context + Index string + Offset *int + Size *int + } + mock.lockSearch.RLock() + calls = mock.calls.Search + mock.lockSearch.RUnlock() + return calls +} diff --git a/internal/elasticsearch/v5/client.go b/internal/elasticsearch/v5/client.go index f70cc7c..e2f6ba0 100644 --- a/internal/elasticsearch/v5/client.go +++ b/internal/elasticsearch/v5/client.go @@ -163,7 +163,7 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { +func (c *Client) Search(_ context.Context, _ string, _, _ *int) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api - return nil, nil + return nil, nil //nolint:nilnil // implementation to be done } diff --git a/internal/elasticsearch/v6/client.go b/internal/elasticsearch/v6/client.go index 2ded404..d189496 100644 --- a/internal/elasticsearch/v6/client.go +++ b/internal/elasticsearch/v6/client.go @@ -166,7 +166,7 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { +func (c *Client) Search(_ context.Context, _ string, _, _ *int) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api - return nil, nil + return nil, nil //nolint:nilnil // implementation to be done } diff --git a/internal/elasticsearch/v7/client.go b/internal/elasticsearch/v7/client.go index d93e2a5..034a9ef 100644 --- a/internal/elasticsearch/v7/client.go +++ b/internal/elasticsearch/v7/client.go @@ -165,7 +165,7 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { +func (c *Client) Search(_ context.Context, _ string, _, _ *int) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api - return nil, nil + return nil, nil //nolint:nilnil // implementation to be done } diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index a8d7d9a..f0e003d 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -31,11 +31,11 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (* // Create the search request req := esapi.SearchRequest{ Index: []string{index}, - Body: strings.NewReader(fmt.Sprintf(`{ + Body: strings.NewReader(`{ "query": { "match_all": {} } - }`)), + }`), From: offset, Size: size, } @@ -44,7 +44,7 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (* ctx, _ = context.WithTimeout(ctx, 5*time.Second) res, err := req.Do(ctx, c.es) if err != nil { - return nil, fmt.Errorf("error getting search response: %s", err) + return nil, fmt.Errorf("error getting search response: %w", err) } defer res.Body.Close() @@ -54,7 +54,7 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (* var response *api.SearchResponse if err := json.NewDecoder(res.Body).Decode(&response); err != nil { - return nil, fmt.Errorf("error parsing the search response body: %s", err) + return nil, fmt.Errorf("error parsing the search response body: %w", err) } return response, nil diff --git a/source/config.go b/source/config.go index 0581eb0..40286a6 100644 --- a/source/config.go +++ b/source/config.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Meroxa, Inc. and Miquido +// Copyright © 2024 Meroxa, Inc. and Miquido // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/source/position.go b/source/position.go index 4f82f51..8f9bd5a 100644 --- a/source/position.go +++ b/source/position.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Meroxa, Inc. and Miquido +// Copyright © 2024 Meroxa, Inc. and Miquido // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/source/source.go b/source/source.go index 7246058..a8cd8c0 100644 --- a/source/source.go +++ b/source/source.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Meroxa, Inc. and Miquido +// Copyright © 2024 Meroxa, Inc. and Miquido // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -135,7 +135,7 @@ func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { } // Ack logs the debug event with the position. -func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { +func (s *Source) Ack(_ context.Context, position opencdc.Position) error { pos := Position{} err := pos.unmarshal(position) if err != nil { diff --git a/source/worker.go b/source/worker.go index 864b93d..348bde5 100644 --- a/source/worker.go +++ b/source/worker.go @@ -1,3 +1,17 @@ +// Copyright © 2024 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package source import ( From c1c0522845c9a4b7d6d3138aa9063e35a1e16d2a Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Wed, 9 Oct 2024 14:08:49 +0530 Subject: [PATCH 18/36] fix: cleanup --- source/config.go | 2 +- source/paramgen.go | 2 +- source/source.go | 8 +++++--- source/worker.go | 2 ++ 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/source/config.go b/source/config.go index 40286a6..b222f76 100644 --- a/source/config.go +++ b/source/config.go @@ -44,5 +44,5 @@ type Config struct { // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. - PollingPeriod time.Duration + PollingPeriod time.Duration `json:"pollingPeriod" default:"1s"` } diff --git a/source/paramgen.go b/source/paramgen.go index 56bebe8..2c2af7d 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -68,7 +68,7 @@ func (Config) Parameters() map[string]config.Parameter { Validations: []config.Validation{}, }, ConfigPollingPeriod: { - Default: "", + Default: "1s", Description: "This period is used by workers to poll for new data at regular intervals.", Type: config.ParameterTypeDuration, Validations: []config.Validation{}, diff --git a/source/source.go b/source/source.go index a8cd8c0..1b3932f 100644 --- a/source/source.go +++ b/source/source.go @@ -28,9 +28,11 @@ import ( type Source struct { sdk.UnimplementedSource - config Config - client elasticsearch.Client - offsets map[string]int + config Config + client elasticsearch.Client + // holds the last position of indexes returned by Read() method + offsets map[string]int + // hold the initial sdk position of indexes, used for Ack() method positions []Position ch chan opencdc.Record shutdown chan struct{} diff --git a/source/worker.go b/source/worker.go index 348bde5..2d4b4f4 100644 --- a/source/worker.go +++ b/source/worker.go @@ -35,6 +35,7 @@ type Worker struct { offset int } +// NewWorker create a new worker goroutine and starts polling elasticsearch for new records func NewWorker(source *Source, index string, offset int) { worker := &Worker{ source: source, @@ -45,6 +46,7 @@ func NewWorker(source *Source, index string, offset int) { go worker.start() } +// start polls elasticsearch for new records and writes it into the source channel func (w *Worker) start() { defer w.source.wg.Done() From a9be98ff82dc3d3ce88b17ce1961643da9de0a0e Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Wed, 9 Oct 2024 16:28:53 +0530 Subject: [PATCH 19/36] fix: add missing tag in config --- source/config.go | 2 +- source/paramgen.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/config.go b/source/config.go index b222f76..435f2c0 100644 --- a/source/config.go +++ b/source/config.go @@ -40,7 +40,7 @@ type Config struct { // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` // The name of the indexes to read data from. - Indexes []string `json:"index"` + Indexes []string `json:"indexes"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. diff --git a/source/paramgen.go b/source/paramgen.go index 2c2af7d..3958a49 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -13,7 +13,7 @@ const ( ConfigCertificateFingerprint = "certificateFingerprint" ConfigCloudID = "cloudID" ConfigHost = "host" - ConfigIndex = "index" + ConfigIndexes = "indexes" ConfigPassword = "password" ConfigPollingPeriod = "pollingPeriod" ConfigServiceToken = "serviceToken" @@ -55,7 +55,7 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigIndex: { + ConfigIndexes: { Default: "", Description: "The name of the indexes to read data from.", Type: config.ParameterTypeString, From d36199ef3e84566320f8bd28bceaa9406a814699 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Thu, 10 Oct 2024 10:23:43 +0530 Subject: [PATCH 20/36] fix: small fix in record metadata --- connector.go | 3 ++- internal/elasticsearch/api/search.go | 5 +--- source/config.go | 34 +++++++++++++++++++++++++++- source/source.go | 2 +- 4 files changed, 37 insertions(+), 7 deletions(-) diff --git a/connector.go b/connector.go index ff3ee24..cce710e 100644 --- a/connector.go +++ b/connector.go @@ -16,11 +16,12 @@ package elasticsearch import ( "github.com/conduitio-labs/conduit-connector-elasticsearch/destination" + "github.com/conduitio-labs/conduit-connector-elasticsearch/source" sdk "github.com/conduitio/conduit-connector-sdk" ) var Connector = sdk.Connector{ NewSpecification: Specification, - NewSource: nil, + NewSource: source.NewSource, NewDestination: destination.NewDestination, } diff --git a/internal/elasticsearch/api/search.go b/internal/elasticsearch/api/search.go index e4ad27a..9002d94 100644 --- a/internal/elasticsearch/api/search.go +++ b/internal/elasticsearch/api/search.go @@ -17,11 +17,8 @@ package api // SearchResponse is the JSON response from Elasticsearch search query. type SearchResponse struct { Hits struct { - Total struct { - Value int `json:"value"` - } `json:"total"` Hits []struct { - Index string `json:"index"` + Index string `json:"_index"` ID string `json:"_id"` Source map[string]any `json:"_source"` } `json:"hits"` diff --git a/source/config.go b/source/config.go index 435f2c0..1a31803 100644 --- a/source/config.go +++ b/source/config.go @@ -41,8 +41,40 @@ type Config struct { CertificateFingerprint string `json:"certificateFingerprint"` // The name of the indexes to read data from. Indexes []string `json:"indexes"` - // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`. + // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. PollingPeriod time.Duration `json:"pollingPeriod" default:"1s"` } + +func (c Config) GetHost() string { + return c.Host +} + +func (c Config) GetUsername() string { + return c.Username +} + +func (c Config) GetPassword() string { + return c.Password +} + +func (c Config) GetCloudID() string { + return c.CloudID +} + +func (c Config) GetAPIKey() string { + return c.APIKey +} + +func (c Config) GetServiceToken() string { + return c.ServiceToken +} + +func (c Config) GetCertificateFingerprint() string { + return c.CertificateFingerprint +} + +func (c Config) GetIndex() string { + return "" // Only for Config to implement the elasticsearch/internal/config +} diff --git a/source/source.go b/source/source.go index 1b3932f..1c96970 100644 --- a/source/source.go +++ b/source/source.go @@ -120,7 +120,7 @@ func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { return opencdc.Record{}, fmt.Errorf("error reading data") } - index, ok := record.Metadata["metadataFieldIndex"] + index, ok := record.Metadata[metadataFieldIndex] if !ok { // this should never happen return opencdc.Record{}, fmt.Errorf("error index not found in data header") From 5a91d3b968488f2c31effac31b856be4e29d6f66 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Thu, 10 Oct 2024 18:38:35 +0530 Subject: [PATCH 21/36] feat: updated position --- source/position.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/source/position.go b/source/position.go index 8f9bd5a..b5fa166 100644 --- a/source/position.go +++ b/source/position.go @@ -17,34 +17,36 @@ package source import ( "encoding/json" "fmt" + "sync" "github.com/conduitio/conduit-commons/opencdc" ) // Position represents position of a document in an index. type Position struct { - ID string `json:"id"` - Index string `json:"index"` - Pos int `json:"pos"` + mu sync.Mutex + IndexPositions map[string]int `json:"indexPositions"` } // ParseSDKPosition parses opencdc.Position and returns Position. -func ParseSDKPosition(position opencdc.Position) ([]Position, error) { - var pos []Position +func ParseSDKPosition(position opencdc.Position) (*Position, error) { + var pos Position if position == nil { - return pos, nil + return &pos, nil } if err := json.Unmarshal(position, &pos); err != nil { - return pos, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) + return &pos, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) } - return pos, nil + return &pos, nil } // marshal marshals Position and returns opencdc.Position or an error. -func (p Position) marshal() (opencdc.Position, error) { +func (p *Position) marshal() (opencdc.Position, error) { + p.mu.Lock() + defer p.mu.Unlock() positionBytes, err := json.Marshal(p) if err != nil { return nil, fmt.Errorf("marshal position: %w", err) @@ -53,11 +55,8 @@ func (p Position) marshal() (opencdc.Position, error) { return positionBytes, nil } -// unmarshal unmarshals opencdc.Position and retuns an error on failure. -func (p Position) unmarshal(position opencdc.Position) error { - err := json.Unmarshal(position, &p) - if err != nil { - return fmt.Errorf("unmarshal position: %w", err) - } - return nil +func (p *Position) set(index string, pos int) { + p.mu.Lock() + defer p.mu.Unlock() + p.IndexPositions[index] = pos } From 862243e11e239cfe8405dde2a98d0cdbcfa524c2 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Thu, 10 Oct 2024 20:01:33 +0530 Subject: [PATCH 22/36] feat: cleanup --- source/source.go | 79 ++++++++++++------------------------------------ source/worker.go | 29 ++++++++---------- 2 files changed, 33 insertions(+), 75 deletions(-) diff --git a/source/source.go b/source/source.go index 1c96970..18939d4 100644 --- a/source/source.go +++ b/source/source.go @@ -28,15 +28,12 @@ import ( type Source struct { sdk.UnimplementedSource - config Config - client elasticsearch.Client - // holds the last position of indexes returned by Read() method - offsets map[string]int - // hold the initial sdk position of indexes, used for Ack() method - positions []Position - ch chan opencdc.Record - shutdown chan struct{} - wg *sync.WaitGroup + config Config + client elasticsearch.Client + position *Position + ch chan opencdc.Record + shutdown chan struct{} + wg *sync.WaitGroup } // NewSource initialises a new source. @@ -67,9 +64,16 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { sdk.Logger(ctx).Info().Msg("Opening an ElasticSearch Source...") var err error - s.positions, err = ParseSDKPosition(position) - if err != nil { - return err + + if position == nil { + s.position = &Position{ + IndexPositions: make(map[string]int), + } + } else { + s.position, err = ParseSDKPosition(position) + if err != nil { + return err + } } // Initialize Elasticsearch client @@ -85,23 +89,14 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { s.ch = make(chan opencdc.Record, s.config.BatchSize) s.shutdown = make(chan struct{}) - s.offsets = make(map[string]int) s.wg = &sync.WaitGroup{} for _, index := range s.config.Indexes { s.wg.Add(1) - - offset := 0 - for _, position := range s.positions { - if index == position.Index { - offset = position.Pos - } - } - - s.offsets[index] = offset + offset, _ := s.position.IndexPositions[index] // a new worker for a new index - NewWorker(s, index, offset) + NewWorker(ctx, s, index, offset) } return nil @@ -120,42 +115,12 @@ func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { return opencdc.Record{}, fmt.Errorf("error reading data") } - index, ok := record.Metadata[metadataFieldIndex] - if !ok { - // this should never happen - return opencdc.Record{}, fmt.Errorf("error index not found in data header") - } - - offset, ok := s.offsets[index] - if !ok { - // this should never happen - return opencdc.Record{}, fmt.Errorf("error offset index not found") - } - - s.offsets[index] = offset + 1 return record, nil } // Ack logs the debug event with the position. -func (s *Source) Ack(_ context.Context, position opencdc.Position) error { - pos := Position{} - err := pos.unmarshal(position) - if err != nil { - return fmt.Errorf("error unmarshaling opencdc position: %w", err) - } - - last := s.offsets[pos.Index] - - for _, p := range s.positions { - if p.Index == pos.Index && p.Pos > pos.Pos { - return fmt.Errorf("error acknowledging: position less than initial sdk position") - } - } - - if last < pos.Pos { - return fmt.Errorf("error acknowledging: record not read") - } - +func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { + sdk.Logger(ctx).Trace().Str("position", string(position)).Msg("got ack") return nil } @@ -163,10 +128,6 @@ func (s *Source) Ack(_ context.Context, position opencdc.Position) error { func (s *Source) Teardown(ctx context.Context) error { sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source") - if s == nil || s.ch == nil { - return fmt.Errorf("error source not opened for teardown") - } - close(s.shutdown) // wait for goroutines to finish diff --git a/source/worker.go b/source/worker.go index 2d4b4f4..c5cda48 100644 --- a/source/worker.go +++ b/source/worker.go @@ -35,23 +35,23 @@ type Worker struct { offset int } -// NewWorker create a new worker goroutine and starts polling elasticsearch for new records -func NewWorker(source *Source, index string, offset int) { +// NewWorker create a new worker goroutine and starts polling elasticsearch for new records. +func NewWorker(ctx context.Context, source *Source, index string, offset int) { worker := &Worker{ source: source, index: index, offset: offset, } - go worker.start() + go worker.start(ctx) } -// start polls elasticsearch for new records and writes it into the source channel -func (w *Worker) start() { +// start polls elasticsearch for new records and writes it into the source channel. +func (w *Worker) start(ctx context.Context) { defer w.source.wg.Done() for { - response, err := w.source.client.Search(context.Background(), w.index, &w.offset, &w.source.config.BatchSize) + response, err := w.source.client.Search(ctx, w.index, &w.offset, &w.source.config.BatchSize) if err != nil || len(response.Hits.Hits) == 0 { if err != nil { log.Println("search() err:", err) @@ -59,7 +59,7 @@ func (w *Worker) start() { select { case <-w.source.shutdown: - log.Println("worker shutting down...") + sdk.Logger(ctx).Debug().Msg("worker shutting down...") return case <-time.After(w.source.config.PollingPeriod): @@ -75,18 +75,15 @@ func (w *Worker) start() { payload, err := json.Marshal(hit.Source) if err != nil { - // log + sdk.Logger(ctx).Err(err).Msg("error marshal payload") continue } - position := Position{ - ID: hit.ID, - Index: hit.Index, - Pos: w.offset + 1, - } - sdkPosition, err := position.marshal() + w.source.position.set(hit.Index, w.offset+1) + + sdkPosition, err := w.source.position.marshal() if err != nil { - // handle + sdk.Logger(ctx).Err(err).Msg("error marshal position") continue } @@ -100,7 +97,7 @@ func (w *Worker) start() { w.offset++ case <-w.source.shutdown: - log.Println("worker shutting down...") + sdk.Logger(ctx).Debug().Msg("worker shutting down...") return } } From ed485c46f21a3427200b6be9ee0b58d3b781bf41 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 11 Oct 2024 12:41:34 +0530 Subject: [PATCH 23/36] feat: added source in readme --- README.md | 23 ++++++++++++++++++++++- source/config.go | 2 +- source/paramgen.go | 4 ++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 88a22de..e59bc07 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,26 @@ For any other action a warning entry is added to log and Record is skipped. | `bulkSize` | The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. Note that values greater than `1000` may require additional service configuration. | `true` | `"1000"` | | `retries` | The maximum number of retries of failed operations. The minimum value is `0` which disabled retry logic. The maximum value is `255`. Note that the higher value, the longer it may take to process retries, as a result, ingest next operations. | `true` | `"1000"` | + +# Source +ElasticSearch source connector allows you to move data from multiple Elasticsearch indexes with the specified `host` and `indexes`. It uses elasticsearch search api to pull data from indexes. Upon starting it pulls batches of data from indexes, once all the data is retrieved, it then polls the search api to pull data at regular intervals. + +## Configuration Options +| name | description | required | default | +|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|----------| +| `version` | The version of the Elasticsearch service. One of: `5`, `6`, `7`, `8`. | `true` | | +| `host` | The Elasticsearch host and port (e.g.: http://127.0.0.1:9200). | `true` | | +| `username` | [v: 5, 6, 7, 8] The username for HTTP Basic Authentication. | `false` | | +| `password` | [v: 5, 6, 7, 8] The password for HTTP Basic Authentication. | `true` when username was provided, `false` otherwise | | +| `cloudId` | [v: 6, 7, 8] Endpoint for the Elastic Service (https://elastic.co/cloud). | `false` | | +| `apiKey` | [v: 6, 7, 8] Base64-encoded token for authorization; if set, overrides username/password and service token. | `false` | | +| `serviceToken` | [v: 7, 8] Service token for authorization; if set, overrides username/password. | `false` | | +| `certificateFingerprint` | [v: 7, 8] SHA256 hex fingerprint given by Elasticsearch on first launch. | `false` | | +| `indexes` | The name of the indexes to read the data from. | `true` | | +| `batchSize` | The number of items to fetch from an index. The minimum value is `1`, maximum value is `10000`. | `false` | `"1000"` | +| `pollingPeriod` | The duration for polling the search api for fetching new records. | `false` | `"5s"` | + + # Testing Run `make test` to run all the unit and integration tests, which require Docker to be installed and running. The command will handle starting and stopping docker containers for you. @@ -57,5 +77,6 @@ docker-compose -f test/docker-compose.v8.overrides.yml -f test/docker-compose.v8 - https://github.com/elastic/go-elasticsearch - https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html +- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-your-data.html -![scarf pixel](https://static.scarf.sh/a.png?x-pxid=) +![scarf pixel](https://static.scarf.sh/a.png?x-pxid=715ebf4a-148c-44ad-8f64-6cc5780d34ae) diff --git a/source/config.go b/source/config.go index 1a31803..dbedbdc 100644 --- a/source/config.go +++ b/source/config.go @@ -44,7 +44,7 @@ type Config struct { // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. - PollingPeriod time.Duration `json:"pollingPeriod" default:"1s"` + PollingPeriod time.Duration `json:"pollingPeriod" default:"5s"` } func (c Config) GetHost() string { diff --git a/source/paramgen.go b/source/paramgen.go index 3958a49..a00ce89 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -31,7 +31,7 @@ func (Config) Parameters() map[string]config.Parameter { }, ConfigBatchSize: { Default: "1000", - Description: "The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10 000`.", + Description: "The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`.", Type: config.ParameterTypeInt, Validations: []config.Validation{}, }, @@ -68,7 +68,7 @@ func (Config) Parameters() map[string]config.Parameter { Validations: []config.Validation{}, }, ConfigPollingPeriod: { - Default: "1s", + Default: "5s", Description: "This period is used by workers to poll for new data at regular intervals.", Type: config.ParameterTypeDuration, Validations: []config.Validation{}, From e71b9070d49a44f22e2c4ada473728d85cd760b6 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 11 Oct 2024 12:42:25 +0530 Subject: [PATCH 24/36] fix: updated modules --- go.mod | 3 ++- go.sum | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 6038def..a11845b 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,14 @@ toolchain go1.23.1 require ( github.com/conduitio/conduit-commons v0.3.0 github.com/conduitio/conduit-connector-sdk v0.10.1 + github.com/elastic/go-elasticsearch v0.0.0 github.com/elastic/go-elasticsearch/v5 v5.6.1 github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v8 v8.15.0 github.com/golangci/golangci-lint v1.61.0 github.com/jaswdr/faker v1.19.1 + github.com/matryer/is v1.4.1 github.com/matryer/moq v0.5.0 github.com/stretchr/testify v1.9.0 go.uber.org/goleak v1.3.0 @@ -130,7 +132,6 @@ require ( github.com/maratori/testableexamples v1.0.0 // indirect github.com/maratori/testpackage v1.1.1 // indirect github.com/matoous/godox v0.0.0-20240105082147-c5b5e0e7c0c0 // indirect - github.com/matryer/is v1.4.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect diff --git a/go.sum b/go.sum index a5bdd05..98c2612 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,8 @@ github.com/denis-tingaikin/go-header v0.5.0 h1:SRdnP5ZKvcO9KKRP1KJrhFR3RrlGuD+42 github.com/denis-tingaikin/go-header v0.5.0/go.mod h1:mMenU5bWrok6Wl2UsZjy+1okegmwQ3UgWl4V1D8gjlY= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/elastic/go-elasticsearch/v5 v5.6.1 h1:RnL2wcXepOT5SdoKMMO1j1OBX0vxHYbBtkQNL2E3xs4= github.com/elastic/go-elasticsearch/v5 v5.6.1/go.mod h1:r7uV7HidpfkYh7D8SB4lkS13TNlNy3oa5GNmTZvuVqY= github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= From 3bd650215baf6bc5b079bb907112fb1df9820a70 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 11 Oct 2024 15:52:05 +0530 Subject: [PATCH 25/36] feat: posititon update and test --- source/position.go | 8 +-- source/position_test.go | 143 ++++++++++++++++++++++++++++++++++++++++ source/worker.go | 2 +- 3 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 source/position_test.go diff --git a/source/position.go b/source/position.go index b5fa166..6098682 100644 --- a/source/position.go +++ b/source/position.go @@ -33,11 +33,11 @@ func ParseSDKPosition(position opencdc.Position) (*Position, error) { var pos Position if position == nil { - return &pos, nil + return nil, nil } if err := json.Unmarshal(position, &pos); err != nil { - return &pos, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) + return nil, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) } return &pos, nil @@ -51,11 +51,11 @@ func (p *Position) marshal() (opencdc.Position, error) { if err != nil { return nil, fmt.Errorf("marshal position: %w", err) } - return positionBytes, nil } -func (p *Position) set(index string, pos int) { +// update updates an index position in the source position. +func (p *Position) update(index string, pos int) { p.mu.Lock() defer p.mu.Unlock() p.IndexPositions[index] = pos diff --git a/source/position_test.go b/source/position_test.go new file mode 100644 index 0000000..6c7b87b --- /dev/null +++ b/source/position_test.go @@ -0,0 +1,143 @@ +// Copyright © 2024 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "errors" + "testing" + + "github.com/conduitio/conduit-commons/opencdc" + "github.com/matryer/is" +) + +func TestParseSDKPosition(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + in opencdc.Position + wantPos *Position + wantErr error + }{ + { + name: "success_position_is_nil", + in: nil, + wantPos: nil, + }, + { + name: "failure_unmarshal_error", + in: opencdc.Position("invalid"), + wantErr: errors.New("unmarshal opencdc.Position into Position: " + + "invalid character 'i' looking for beginning of value"), + }, + { + name: "success_no_error", + in: opencdc.Position(`{ + "indexPositions": { + "index": 10 + } + }`), + wantPos: &Position{IndexPositions: map[string]int{"index": 10}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + is := is.New(t) + + got, err := ParseSDKPosition(tt.in) + if tt.wantErr == nil { + is.NoErr(err) + is.Equal(got, tt.wantPos) + } else { + is.True(err != nil) + is.Equal(err.Error(), tt.wantErr.Error()) + } + }) + } +} + +func TestMarshal(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + in Position + want opencdc.Position + }{ + { + name: "successful marshal", + in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + want: []byte(`{"indexPositions":{"a":1,"b":2}}`), + }, + { + name: "marshal empty map", + in: Position{IndexPositions: map[string]int{}}, + want: []byte(`{"indexPositions":{}}`), + }, + { + name: "marshal with nil map", + in: Position{IndexPositions: nil}, + want: []byte(`{"indexPositions":null}`), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + is := is.New(t) + + got, err := tt.in.marshal() + is.NoErr(err) + is.Equal(got, tt.want) + }) + } +} + +func TestUpdate(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + in Position + updateIndex string + updatePos int + want Position + }{ + { + name: "update existing index", + in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + updateIndex: "a", + updatePos: 10, + want: Position{IndexPositions: map[string]int{"a": 10, "b": 2}}, + }, + { + name: "update new index", + in: Position{IndexPositions: map[string]int{"a": 1}}, + updateIndex: "b", + updatePos: 5, + want: Position{IndexPositions: map[string]int{"a": 1, "b": 5}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + is := is.New(t) + + tt.in.update(tt.updateIndex, tt.updatePos) + is.Equal(tt.in, tt.want) + }) + } +} diff --git a/source/worker.go b/source/worker.go index c5cda48..91f7eca 100644 --- a/source/worker.go +++ b/source/worker.go @@ -79,7 +79,7 @@ func (w *Worker) start(ctx context.Context) { continue } - w.source.position.set(hit.Index, w.offset+1) + w.source.position.update(hit.Index, w.offset+1) sdkPosition, err := w.source.position.marshal() if err != nil { From f19eb7b8ea3d129d63ded6560c2b5db237f9fabb Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 11 Oct 2024 17:39:31 +0530 Subject: [PATCH 26/36] fix: linters and cleanup --- internal/elasticsearch/v8/search.go | 3 ++- source/config.go | 2 +- source/paramgen.go | 4 +++- source/position.go | 9 ++++---- source/position_test.go | 32 +++++++++++++++-------------- source/source.go | 8 ++------ 6 files changed, 30 insertions(+), 28 deletions(-) diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index f0e003d..7b43ef0 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -41,7 +41,8 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (* } // Perform the request - ctx, _ = context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() res, err := req.Do(ctx, c.es) if err != nil { return nil, fmt.Errorf("error getting search response: %w", err) diff --git a/source/config.go b/source/config.go index dbedbdc..c79c520 100644 --- a/source/config.go +++ b/source/config.go @@ -40,7 +40,7 @@ type Config struct { // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` // The name of the indexes to read data from. - Indexes []string `json:"indexes"` + Indexes []string `json:"indexes" validate:"required"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. diff --git a/source/paramgen.go b/source/paramgen.go index a00ce89..d9f45dd 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -59,7 +59,9 @@ func (Config) Parameters() map[string]config.Parameter { Default: "", Description: "The name of the indexes to read data from.", Type: config.ParameterTypeString, - Validations: []config.Validation{}, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, }, ConfigPassword: { Default: "", diff --git a/source/position.go b/source/position.go index 6098682..0383fde 100644 --- a/source/position.go +++ b/source/position.go @@ -28,14 +28,15 @@ type Position struct { IndexPositions map[string]int `json:"indexPositions"` } +// NewPosition initializes a new position when sdk position is nil. +func NewPosition() *Position { + return &Position{IndexPositions: make(map[string]int)} +} + // ParseSDKPosition parses opencdc.Position and returns Position. func ParseSDKPosition(position opencdc.Position) (*Position, error) { var pos Position - if position == nil { - return nil, nil - } - if err := json.Unmarshal(position, &pos); err != nil { return nil, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) } diff --git a/source/position_test.go b/source/position_test.go index 6c7b87b..d0bed6a 100644 --- a/source/position_test.go +++ b/source/position_test.go @@ -22,6 +22,13 @@ import ( "github.com/matryer/is" ) +func TestNewPosition(t *testing.T) { + is := is.New(t) + pos := NewPosition() + is.True(pos != nil) + is.Equal(len(pos.IndexPositions), 0) +} + func TestParseSDKPosition(t *testing.T) { t.Parallel() @@ -31,11 +38,6 @@ func TestParseSDKPosition(t *testing.T) { wantPos *Position wantErr error }{ - { - name: "success_position_is_nil", - in: nil, - wantPos: nil, - }, { name: "failure_unmarshal_error", in: opencdc.Position("invalid"), @@ -75,22 +77,22 @@ func TestMarshal(t *testing.T) { tests := []struct { name string - in Position + in *Position want opencdc.Position }{ { name: "successful marshal", - in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, want: []byte(`{"indexPositions":{"a":1,"b":2}}`), }, { name: "marshal empty map", - in: Position{IndexPositions: map[string]int{}}, + in: &Position{IndexPositions: map[string]int{}}, want: []byte(`{"indexPositions":{}}`), }, { name: "marshal with nil map", - in: Position{IndexPositions: nil}, + in: &Position{IndexPositions: nil}, want: []byte(`{"indexPositions":null}`), }, } @@ -111,24 +113,24 @@ func TestUpdate(t *testing.T) { tests := []struct { name string - in Position + in *Position updateIndex string updatePos int - want Position + want *Position }{ { name: "update existing index", - in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, updateIndex: "a", updatePos: 10, - want: Position{IndexPositions: map[string]int{"a": 10, "b": 2}}, + want: &Position{IndexPositions: map[string]int{"a": 10, "b": 2}}, }, { name: "update new index", - in: Position{IndexPositions: map[string]int{"a": 1}}, + in: &Position{IndexPositions: map[string]int{"a": 1}}, updateIndex: "b", updatePos: 5, - want: Position{IndexPositions: map[string]int{"a": 1, "b": 5}}, + want: &Position{IndexPositions: map[string]int{"a": 1, "b": 5}}, }, } for _, tt := range tests { diff --git a/source/source.go b/source/source.go index 18939d4..c2da321 100644 --- a/source/source.go +++ b/source/source.go @@ -66,9 +66,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { var err error if position == nil { - s.position = &Position{ - IndexPositions: make(map[string]int), - } + s.position = NewPosition() } else { s.position, err = ParseSDKPosition(position) if err != nil { @@ -93,7 +91,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { for _, index := range s.config.Indexes { s.wg.Add(1) - offset, _ := s.position.IndexPositions[index] + offset := s.position.IndexPositions[index] // a new worker for a new index NewWorker(ctx, s, index, offset) @@ -127,9 +125,7 @@ func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { // Teardown gracefully shutdown connector. func (s *Source) Teardown(ctx context.Context) error { sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source") - close(s.shutdown) - // wait for goroutines to finish s.wg.Wait() // close the read channel for write From ed87a325a539363db5ec4633c98662822e93f216 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Thu, 17 Oct 2024 08:56:41 +0530 Subject: [PATCH 27/36] fix: some pr comments --- source/position.go | 4 ++++ source/source.go | 11 +++-------- source/worker.go | 7 +------ 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/source/position.go b/source/position.go index 0383fde..6ee89e4 100644 --- a/source/position.go +++ b/source/position.go @@ -37,6 +37,10 @@ func NewPosition() *Position { func ParseSDKPosition(position opencdc.Position) (*Position, error) { var pos Position + if position == nil { + return NewPosition(), nil + } + if err := json.Unmarshal(position, &pos); err != nil { return nil, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) } diff --git a/source/source.go b/source/source.go index c2da321..dbe8cf0 100644 --- a/source/source.go +++ b/source/source.go @@ -64,14 +64,9 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { sdk.Logger(ctx).Info().Msg("Opening an ElasticSearch Source...") var err error - - if position == nil { - s.position = NewPosition() - } else { - s.position, err = ParseSDKPosition(position) - if err != nil { - return err - } + s.position, err = ParseSDKPosition(position) + if err != nil { + return err } // Initialize Elasticsearch client diff --git a/source/worker.go b/source/worker.go index 91f7eca..f01f3e8 100644 --- a/source/worker.go +++ b/source/worker.go @@ -24,11 +24,6 @@ import ( sdk "github.com/conduitio/conduit-connector-sdk" ) -const ( - // metadataFieldIndex is a name of a record metadata field that stores a ElasticSearch Index name. - metadataFieldIndex = "opencdc.collection" -) - type Worker struct { source *Source index string @@ -69,7 +64,7 @@ func (w *Worker) start(ctx context.Context) { for _, hit := range response.Hits.Hits { metadata := opencdc.Metadata{ - metadataFieldIndex: hit.Index, + opencdc.MetadataCollection: hit.Index, } metadata.SetCreatedAt(time.Now().UTC()) From 4a5502a08c3ef57ffae6b52dd60e5c8891bf9bea Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Thu, 17 Oct 2024 23:38:17 +0530 Subject: [PATCH 28/36] feat: search after --- destination/client_moq_test.go | 40 +++++++++--------------- internal/elasticsearch/api/search.go | 10 ++++++ internal/elasticsearch/client.go | 2 +- internal/elasticsearch/v5/client.go | 2 +- internal/elasticsearch/v6/client.go | 2 +- internal/elasticsearch/v7/client.go | 2 +- internal/elasticsearch/v8/search.go | 36 ++++++++++++++++------ source/config.go | 4 +++ source/paramgen.go | 18 +++++++++++ source/position.go | 8 ++--- source/position_test.go | 16 +++++----- source/source.go | 14 +++++++-- source/worker.go | 46 ++++++++++++++++++++++------ 13 files changed, 136 insertions(+), 64 deletions(-) diff --git a/destination/client_moq_test.go b/destination/client_moq_test.go index ac99814..12a3f2f 100644 --- a/destination/client_moq_test.go +++ b/destination/client_moq_test.go @@ -36,7 +36,7 @@ var _ client = &clientMock{} // PrepareUpsertOperationFunc: func(key string, item opencdc.Record) (interface{}, interface{}, error) { // panic("mock out the PrepareUpsertOperation method") // }, -// SearchFunc: func(ctx context.Context, index string, offset *int, size *int) (*api.SearchResponse, error) { +// SearchFunc: func(ctx context.Context, request *api.SearchRequest) (*api.SearchResponse, error) { // panic("mock out the Search method") // }, // } @@ -62,7 +62,7 @@ type clientMock struct { PrepareUpsertOperationFunc func(key string, item opencdc.Record) (interface{}, interface{}, error) // SearchFunc mocks the Search method. - SearchFunc func(ctx context.Context, index string, offset *int, size *int) (*api.SearchResponse, error) + SearchFunc func(ctx context.Context, request *api.SearchRequest) (*api.SearchResponse, error) // calls tracks calls to the methods. calls struct { @@ -99,12 +99,8 @@ type clientMock struct { Search []struct { // Ctx is the ctx argument value. Ctx context.Context - // Index is the index argument value. - Index string - // Offset is the offset argument value. - Offset *int - // Size is the size argument value. - Size *int + // Request is the request argument value. + Request *api.SearchRequest } } lockBulk sync.RWMutex @@ -284,25 +280,21 @@ func (mock *clientMock) PrepareUpsertOperationCalls() []struct { } // Search calls SearchFunc. -func (mock *clientMock) Search(ctx context.Context, index string, offset *int, size *int) (*api.SearchResponse, error) { +func (mock *clientMock) Search(ctx context.Context, request *api.SearchRequest) (*api.SearchResponse, error) { if mock.SearchFunc == nil { panic("clientMock.SearchFunc: method is nil but client.Search was just called") } callInfo := struct { - Ctx context.Context - Index string - Offset *int - Size *int + Ctx context.Context + Request *api.SearchRequest }{ - Ctx: ctx, - Index: index, - Offset: offset, - Size: size, + Ctx: ctx, + Request: request, } mock.lockSearch.Lock() mock.calls.Search = append(mock.calls.Search, callInfo) mock.lockSearch.Unlock() - return mock.SearchFunc(ctx, index, offset, size) + return mock.SearchFunc(ctx, request) } // SearchCalls gets all the calls that were made to Search. @@ -310,16 +302,12 @@ func (mock *clientMock) Search(ctx context.Context, index string, offset *int, s // // len(mockedclient.SearchCalls()) func (mock *clientMock) SearchCalls() []struct { - Ctx context.Context - Index string - Offset *int - Size *int + Ctx context.Context + Request *api.SearchRequest } { var calls []struct { - Ctx context.Context - Index string - Offset *int - Size *int + Ctx context.Context + Request *api.SearchRequest } mock.lockSearch.RLock() calls = mock.calls.Search diff --git a/internal/elasticsearch/api/search.go b/internal/elasticsearch/api/search.go index 9002d94..26bab86 100644 --- a/internal/elasticsearch/api/search.go +++ b/internal/elasticsearch/api/search.go @@ -14,6 +14,15 @@ package api +// SearchRequest is the request for calling ElasticSearch api. +type SearchRequest struct { + Index string `json:"index"` + Size *int `json:"size:"` + SearchAfter int64 `json:"searchAfter"` + SortBy string `json:"sortBy"` + Order string `json:"order"` +} + // SearchResponse is the JSON response from Elasticsearch search query. type SearchResponse struct { Hits struct { @@ -21,6 +30,7 @@ type SearchResponse struct { Index string `json:"_index"` ID string `json:"_id"` Source map[string]any `json:"_source"` + Sort []int64 `json:"sort"` // used for search_after } `json:"hits"` } `json:"hits"` } diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index e4cf0c2..3ccd7a5 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -42,5 +42,5 @@ type Client interface { PrepareDeleteOperation(key string) (metadata interface{}, err error) // Search calls the elasticsearch search api and retuns SearchResponse read from an index. - Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) + Search(ctx context.Context, request *api.SearchRequest) (*api.SearchResponse, error) } diff --git a/internal/elasticsearch/v5/client.go b/internal/elasticsearch/v5/client.go index e2f6ba0..36674a3 100644 --- a/internal/elasticsearch/v5/client.go +++ b/internal/elasticsearch/v5/client.go @@ -163,7 +163,7 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(_ context.Context, _ string, _, _ *int) (*api.SearchResponse, error) { +func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api return nil, nil //nolint:nilnil // implementation to be done } diff --git a/internal/elasticsearch/v6/client.go b/internal/elasticsearch/v6/client.go index d189496..a43f35a 100644 --- a/internal/elasticsearch/v6/client.go +++ b/internal/elasticsearch/v6/client.go @@ -166,7 +166,7 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(_ context.Context, _ string, _, _ *int) (*api.SearchResponse, error) { +func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api return nil, nil //nolint:nilnil // implementation to be done } diff --git a/internal/elasticsearch/v7/client.go b/internal/elasticsearch/v7/client.go index 034a9ef..4d15ce6 100644 --- a/internal/elasticsearch/v7/client.go +++ b/internal/elasticsearch/v7/client.go @@ -165,7 +165,7 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { } // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(_ context.Context, _ string, _, _ *int) (*api.SearchResponse, error) { +func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api return nil, nil //nolint:nilnil // implementation to be done } diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index 7b43ef0..d66a6de 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "log" "strings" "time" @@ -27,17 +28,12 @@ import ( ) // Search calls the elasticsearch search api and retuns SearchResponse read from an index. -func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*api.SearchResponse, error) { +func (c *Client) Search(ctx context.Context, request *api.SearchRequest) (*api.SearchResponse, error) { // Create the search request req := esapi.SearchRequest{ - Index: []string{index}, - Body: strings.NewReader(`{ - "query": { - "match_all": {} - } - }`), - From: offset, - Size: size, + Index: []string{request.Index}, + Body: strings.NewReader(createSearchBody(request.SortBy, request.Order, request.SearchAfter)), + Size: request.Size, } // Perform the request @@ -60,3 +56,25 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (* return response, nil } + +func createSearchBody(sortByField, order string, searchAfter int64) string { + body := map[string]interface{}{ + "query": map[string]interface{}{ + "match_all": struct{}{}, + }, + "sort": []map[string]string{ + {sortByField: order}, + }, + } + + if searchAfter > 0 { + body["search_after"] = []int64{searchAfter} + } + + jsonBody, err := json.Marshal(body) + if err != nil { + log.Printf("error marshaling the search request body: %s", err) + } + + return string(jsonBody) +} diff --git a/source/config.go b/source/config.go index c79c520..dc81811 100644 --- a/source/config.go +++ b/source/config.go @@ -41,6 +41,10 @@ type Config struct { CertificateFingerprint string `json:"certificateFingerprint"` // The name of the indexes to read data from. Indexes []string `json:"indexes" validate:"required"` + // The sortbyField for each index to be used by elasticsearch search api. + IndexSortFields []string `json:"indexSortFields" validate:"required"` + // The sortOrders for each index to be used by elasticsearch search api. + SortOrders []string `json:"sortOrders" validate:"required"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. diff --git a/source/paramgen.go b/source/paramgen.go index d9f45dd..0ec8473 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -13,10 +13,12 @@ const ( ConfigCertificateFingerprint = "certificateFingerprint" ConfigCloudID = "cloudID" ConfigHost = "host" + ConfigIndexSortFields = "indexSortFields" ConfigIndexes = "indexes" ConfigPassword = "password" ConfigPollingPeriod = "pollingPeriod" ConfigServiceToken = "serviceToken" + ConfigSortOrders = "sortOrders" ConfigUsername = "username" ConfigVersion = "version" ) @@ -55,6 +57,14 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, + ConfigIndexSortFields: { + Default: "", + Description: "The sortbyField for each index to be used by elasticsearch search api.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, ConfigIndexes: { Default: "", Description: "The name of the indexes to read data from.", @@ -81,6 +91,14 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeString, Validations: []config.Validation{}, }, + ConfigSortOrders: { + Default: "", + Description: "The sortOrders for each index to be used by elasticsearch search api.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, ConfigUsername: { Default: "", Description: "The username for HTTP Basic Authentication.", diff --git a/source/position.go b/source/position.go index 6ee89e4..ea9e267 100644 --- a/source/position.go +++ b/source/position.go @@ -25,12 +25,12 @@ import ( // Position represents position of a document in an index. type Position struct { mu sync.Mutex - IndexPositions map[string]int `json:"indexPositions"` + IndexPositions map[string]int64 `json:"indexPositions"` } // NewPosition initializes a new position when sdk position is nil. func NewPosition() *Position { - return &Position{IndexPositions: make(map[string]int)} + return &Position{IndexPositions: make(map[string]int64)} } // ParseSDKPosition parses opencdc.Position and returns Position. @@ -60,8 +60,8 @@ func (p *Position) marshal() (opencdc.Position, error) { } // update updates an index position in the source position. -func (p *Position) update(index string, pos int) { +func (p *Position) update(index string, lastRecordSortID int64) { p.mu.Lock() defer p.mu.Unlock() - p.IndexPositions[index] = pos + p.IndexPositions[index] = lastRecordSortID } diff --git a/source/position_test.go b/source/position_test.go index d0bed6a..9d3b942 100644 --- a/source/position_test.go +++ b/source/position_test.go @@ -51,7 +51,7 @@ func TestParseSDKPosition(t *testing.T) { "index": 10 } }`), - wantPos: &Position{IndexPositions: map[string]int{"index": 10}}, + wantPos: &Position{IndexPositions: map[string]int64{"index": 10}}, }, } @@ -82,12 +82,12 @@ func TestMarshal(t *testing.T) { }{ { name: "successful marshal", - in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + in: &Position{IndexPositions: map[string]int64{"a": 1, "b": 2}}, want: []byte(`{"indexPositions":{"a":1,"b":2}}`), }, { name: "marshal empty map", - in: &Position{IndexPositions: map[string]int{}}, + in: &Position{IndexPositions: map[string]int64{}}, want: []byte(`{"indexPositions":{}}`), }, { @@ -115,22 +115,22 @@ func TestUpdate(t *testing.T) { name string in *Position updateIndex string - updatePos int + updatePos int64 want *Position }{ { name: "update existing index", - in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + in: &Position{IndexPositions: map[string]int64{"a": 1, "b": 2}}, updateIndex: "a", updatePos: 10, - want: &Position{IndexPositions: map[string]int{"a": 10, "b": 2}}, + want: &Position{IndexPositions: map[string]int64{"a": 10, "b": 2}}, }, { name: "update new index", - in: &Position{IndexPositions: map[string]int{"a": 1}}, + in: &Position{IndexPositions: map[string]int64{"a": 1}}, updateIndex: "b", updatePos: 5, - want: &Position{IndexPositions: map[string]int{"a": 1, "b": 5}}, + want: &Position{IndexPositions: map[string]int64{"a": 1, "b": 5}}, }, } for _, tt := range tests { diff --git a/source/source.go b/source/source.go index dbe8cf0..281dd1b 100644 --- a/source/source.go +++ b/source/source.go @@ -56,6 +56,14 @@ func (s *Source) Configure(ctx context.Context, cfgRaw config.Config) error { return err } + // custom validations + if len(s.config.Indexes) != len(s.config.IndexSortFields) { + return fmt.Errorf("each index should have a respective sort field") + } + if len(s.config.IndexSortFields) != len(s.config.SortOrders) { + return fmt.Errorf("each sortfield should have a respective sort order") + } + return nil } @@ -84,12 +92,12 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { s.shutdown = make(chan struct{}) s.wg = &sync.WaitGroup{} - for _, index := range s.config.Indexes { + for i, index := range s.config.Indexes { s.wg.Add(1) - offset := s.position.IndexPositions[index] + lastRecordSortID := s.position.IndexPositions[index] // a new worker for a new index - NewWorker(ctx, s, index, offset) + NewWorker(ctx, s, index, s.config.IndexSortFields[i], s.config.SortOrders[i], lastRecordSortID) } return nil diff --git a/source/worker.go b/source/worker.go index f01f3e8..9c414fd 100644 --- a/source/worker.go +++ b/source/worker.go @@ -20,22 +20,34 @@ import ( "log" "time" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" ) type Worker struct { - source *Source - index string - offset int + source *Source + index string + sortByField string + orderBy string + lastRecordSortID int64 } // NewWorker create a new worker goroutine and starts polling elasticsearch for new records. -func NewWorker(ctx context.Context, source *Source, index string, offset int) { +func NewWorker( + ctx context.Context, + source *Source, + index string, + sortByField string, + orderBy string, + lastRecordSortID int64, +) { worker := &Worker{ - source: source, - index: index, - offset: offset, + source: source, + index: index, + sortByField: sortByField, + orderBy: orderBy, + lastRecordSortID: lastRecordSortID, } go worker.start(ctx) @@ -46,7 +58,15 @@ func (w *Worker) start(ctx context.Context) { defer w.source.wg.Done() for { - response, err := w.source.client.Search(ctx, w.index, &w.offset, &w.source.config.BatchSize) + request := &api.SearchRequest{ + Index: w.index, + Size: &w.source.config.BatchSize, + SearchAfter: w.lastRecordSortID, + SortBy: w.sortByField, + Order: w.orderBy, + } + + response, err := w.source.client.Search(ctx, request) if err != nil || len(response.Hits.Hits) == 0 { if err != nil { log.Println("search() err:", err) @@ -74,7 +94,13 @@ func (w *Worker) start(ctx context.Context) { continue } - w.source.position.update(hit.Index, w.offset+1) + if len(hit.Sort) == 0 { + // this should never happen + sdk.Logger(ctx).Err(err).Msg("error hit.Sort is empty") + continue + } + + w.source.position.update(hit.Index, hit.Sort[0]) sdkPosition, err := w.source.position.marshal() if err != nil { @@ -89,7 +115,7 @@ func (w *Worker) start(ctx context.Context) { select { case w.source.ch <- record: - w.offset++ + w.lastRecordSortID = hit.Sort[0] case <-w.source.shutdown: sdk.Logger(ctx).Debug().Msg("worker shutting down...") From 633523f17aa86b289a4eeb6dd0b894ae7493ceb0 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 18 Oct 2024 15:12:22 +0530 Subject: [PATCH 29/36] fix: config used map[string]type --- README.md | 2 ++ source/config.go | 14 ++++++++------ source/paramgen.go | 19 +++++-------------- source/source.go | 12 ++---------- 4 files changed, 17 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index e59bc07..718e797 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ ElasticSearch source connector allows you to move data from multiple Elasticsear | `serviceToken` | [v: 7, 8] Service token for authorization; if set, overrides username/password. | `false` | | | `certificateFingerprint` | [v: 7, 8] SHA256 hex fingerprint given by Elasticsearch on first launch. | `false` | | | `indexes` | The name of the indexes to read the data from. | `true` | | +| `indexes.*.sortBy` | The sortby field for each index to be used by elasticsearch search api. | `true` | | +| `indexes.*.sortOrder` | The sortOrder (asc or desc) for each index to be used by elasticsearch search api. | `true` | | | `batchSize` | The number of items to fetch from an index. The minimum value is `1`, maximum value is `10000`. | `false` | `"1000"` | | `pollingPeriod` | The duration for polling the search api for fetching new records. | `false` | `"5s"` | diff --git a/source/config.go b/source/config.go index dc81811..96efb3c 100644 --- a/source/config.go +++ b/source/config.go @@ -39,17 +39,19 @@ type Config struct { ServiceToken string `json:"serviceToken"` // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` - // The name of the indexes to read data from. - Indexes []string `json:"indexes" validate:"required"` - // The sortbyField for each index to be used by elasticsearch search api. - IndexSortFields []string `json:"indexSortFields" validate:"required"` - // The sortOrders for each index to be used by elasticsearch search api. - SortOrders []string `json:"sortOrders" validate:"required"` + // The name of the indexes and sort details to read data from. + Indexes map[string]Sort `json:"indexes" validate:"required"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. PollingPeriod time.Duration `json:"pollingPeriod" default:"5s"` } +type Sort struct { + // The sortbyField for each index to be used by elasticsearch search api. + SortBy string `json:"sortBy" validate:"required"` + // The sortOrders(asc or desc) for each index to be used by elasticsearch search api. + SortOrder string `json:"sortOrder" validate:"required"` +} func (c Config) GetHost() string { return c.Host diff --git a/source/paramgen.go b/source/paramgen.go index 0ec8473..fdbed5c 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -13,12 +13,11 @@ const ( ConfigCertificateFingerprint = "certificateFingerprint" ConfigCloudID = "cloudID" ConfigHost = "host" - ConfigIndexSortFields = "indexSortFields" - ConfigIndexes = "indexes" + ConfigIndexesSortBy = "indexes.*.sortBy" + ConfigIndexesSortOrder = "indexes.*.sortOrder" ConfigPassword = "password" ConfigPollingPeriod = "pollingPeriod" ConfigServiceToken = "serviceToken" - ConfigSortOrders = "sortOrders" ConfigUsername = "username" ConfigVersion = "version" ) @@ -57,7 +56,7 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigIndexSortFields: { + ConfigIndexesSortBy: { Default: "", Description: "The sortbyField for each index to be used by elasticsearch search api.", Type: config.ParameterTypeString, @@ -65,9 +64,9 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigIndexes: { + ConfigIndexesSortOrder: { Default: "", - Description: "The name of the indexes to read data from.", + Description: "The sortOrders(asc or desc) for each index to be used by elasticsearch search api.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, @@ -91,14 +90,6 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeString, Validations: []config.Validation{}, }, - ConfigSortOrders: { - Default: "", - Description: "The sortOrders for each index to be used by elasticsearch search api.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, ConfigUsername: { Default: "", Description: "The username for HTTP Basic Authentication.", diff --git a/source/source.go b/source/source.go index 281dd1b..a0ab2d7 100644 --- a/source/source.go +++ b/source/source.go @@ -56,14 +56,6 @@ func (s *Source) Configure(ctx context.Context, cfgRaw config.Config) error { return err } - // custom validations - if len(s.config.Indexes) != len(s.config.IndexSortFields) { - return fmt.Errorf("each index should have a respective sort field") - } - if len(s.config.IndexSortFields) != len(s.config.SortOrders) { - return fmt.Errorf("each sortfield should have a respective sort order") - } - return nil } @@ -92,12 +84,12 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { s.shutdown = make(chan struct{}) s.wg = &sync.WaitGroup{} - for i, index := range s.config.Indexes { + for index, sort := range s.config.Indexes { s.wg.Add(1) lastRecordSortID := s.position.IndexPositions[index] // a new worker for a new index - NewWorker(ctx, s, index, s.config.IndexSortFields[i], s.config.SortOrders[i], lastRecordSortID) + NewWorker(ctx, s, index, sort.SortBy, sort.SortOrder, lastRecordSortID) } return nil From 0d4d39ff67b825e02fc850a6d7a7c162322ef0d8 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 18 Oct 2024 15:30:13 +0530 Subject: [PATCH 30/36] fix: pr comment esapi modules updated --- go.mod | 1 - internal/elasticsearch/v8/search.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/go.mod b/go.mod index e16ce10..6ef8048 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.23.2 require ( github.com/conduitio/conduit-commons v0.4.0 github.com/conduitio/conduit-connector-sdk v0.11.0 - github.com/elastic/go-elasticsearch v0.0.0 github.com/elastic/go-elasticsearch/v5 v5.6.1 github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v7 v7.17.10 diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index d66a6de..1b5a620 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -24,7 +24,7 @@ import ( "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" - "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/go-elasticsearch/v8/esapi" ) // Search calls the elasticsearch search api and retuns SearchResponse read from an index. From 0bc3124f479bea8197380bb37a25063d95a336bb Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 18 Oct 2024 16:00:54 +0530 Subject: [PATCH 31/36] fix: go.sum --- go.sum | 2 -- source/config.go | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/go.sum b/go.sum index b16d12c..39c8868 100644 --- a/go.sum +++ b/go.sum @@ -96,8 +96,6 @@ github.com/denis-tingaikin/go-header v0.5.0 h1:SRdnP5ZKvcO9KKRP1KJrhFR3RrlGuD+42 github.com/denis-tingaikin/go-header v0.5.0/go.mod h1:mMenU5bWrok6Wl2UsZjy+1okegmwQ3UgWl4V1D8gjlY= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= -github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= -github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/elastic/go-elasticsearch/v5 v5.6.1 h1:RnL2wcXepOT5SdoKMMO1j1OBX0vxHYbBtkQNL2E3xs4= github.com/elastic/go-elasticsearch/v5 v5.6.1/go.mod h1:r7uV7HidpfkYh7D8SB4lkS13TNlNy3oa5GNmTZvuVqY= github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= diff --git a/source/config.go b/source/config.go index 96efb3c..e0125ae 100644 --- a/source/config.go +++ b/source/config.go @@ -46,6 +46,7 @@ type Config struct { // This period is used by workers to poll for new data at regular intervals. PollingPeriod time.Duration `json:"pollingPeriod" default:"5s"` } + type Sort struct { // The sortbyField for each index to be used by elasticsearch search api. SortBy string `json:"sortBy" validate:"required"` From 5425b2303ac23e7cda395716abf10b321688105c Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Thu, 24 Oct 2024 18:47:04 +0530 Subject: [PATCH 32/36] fix: sort by seq_no --- internal/elasticsearch/v8/search.go | 10 ++++++---- source/config.go | 11 ++--------- source/paramgen.go | 15 +++------------ source/source.go | 4 ++-- source/worker.go | 4 ---- 5 files changed, 13 insertions(+), 31 deletions(-) diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index 1b5a620..2ca183e 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -32,7 +32,7 @@ func (c *Client) Search(ctx context.Context, request *api.SearchRequest) (*api.S // Create the search request req := esapi.SearchRequest{ Index: []string{request.Index}, - Body: strings.NewReader(createSearchBody(request.SortBy, request.Order, request.SearchAfter)), + Body: strings.NewReader(createSearchBody(request.SearchAfter)), Size: request.Size, } @@ -57,13 +57,15 @@ func (c *Client) Search(ctx context.Context, request *api.SearchRequest) (*api.S return response, nil } -func createSearchBody(sortByField, order string, searchAfter int64) string { +func createSearchBody(searchAfter int64) string { body := map[string]interface{}{ "query": map[string]interface{}{ "match_all": struct{}{}, }, - "sort": []map[string]string{ - {sortByField: order}, + "sort": []map[string]interface{}{ + {"_seq_no": map[string]string{ + "order": "asc", + }}, }, } diff --git a/source/config.go b/source/config.go index e0125ae..c79c520 100644 --- a/source/config.go +++ b/source/config.go @@ -39,21 +39,14 @@ type Config struct { ServiceToken string `json:"serviceToken"` // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` - // The name of the indexes and sort details to read data from. - Indexes map[string]Sort `json:"indexes" validate:"required"` + // The name of the indexes to read data from. + Indexes []string `json:"indexes" validate:"required"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. PollingPeriod time.Duration `json:"pollingPeriod" default:"5s"` } -type Sort struct { - // The sortbyField for each index to be used by elasticsearch search api. - SortBy string `json:"sortBy" validate:"required"` - // The sortOrders(asc or desc) for each index to be used by elasticsearch search api. - SortOrder string `json:"sortOrder" validate:"required"` -} - func (c Config) GetHost() string { return c.Host } diff --git a/source/paramgen.go b/source/paramgen.go index fdbed5c..d9f45dd 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -13,8 +13,7 @@ const ( ConfigCertificateFingerprint = "certificateFingerprint" ConfigCloudID = "cloudID" ConfigHost = "host" - ConfigIndexesSortBy = "indexes.*.sortBy" - ConfigIndexesSortOrder = "indexes.*.sortOrder" + ConfigIndexes = "indexes" ConfigPassword = "password" ConfigPollingPeriod = "pollingPeriod" ConfigServiceToken = "serviceToken" @@ -56,17 +55,9 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigIndexesSortBy: { + ConfigIndexes: { Default: "", - Description: "The sortbyField for each index to be used by elasticsearch search api.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - ConfigIndexesSortOrder: { - Default: "", - Description: "The sortOrders(asc or desc) for each index to be used by elasticsearch search api.", + Description: "The name of the indexes to read data from.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, diff --git a/source/source.go b/source/source.go index a0ab2d7..2116a24 100644 --- a/source/source.go +++ b/source/source.go @@ -84,12 +84,12 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { s.shutdown = make(chan struct{}) s.wg = &sync.WaitGroup{} - for index, sort := range s.config.Indexes { + for _, index := range s.config.Indexes { s.wg.Add(1) lastRecordSortID := s.position.IndexPositions[index] // a new worker for a new index - NewWorker(ctx, s, index, sort.SortBy, sort.SortOrder, lastRecordSortID) + NewWorker(ctx, s, index, lastRecordSortID) } return nil diff --git a/source/worker.go b/source/worker.go index 9c414fd..f64eab4 100644 --- a/source/worker.go +++ b/source/worker.go @@ -38,15 +38,11 @@ func NewWorker( ctx context.Context, source *Source, index string, - sortByField string, - orderBy string, lastRecordSortID int64, ) { worker := &Worker{ source: source, index: index, - sortByField: sortByField, - orderBy: orderBy, lastRecordSortID: lastRecordSortID, } From 8a9a4263a86bed0108e15c708e9f4390456b82cf Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 25 Oct 2024 15:10:02 +0530 Subject: [PATCH 33/36] fix: handled zeroth record scenario --- internal/elasticsearch/api/search.go | 10 +++++----- internal/elasticsearch/v8/search.go | 6 +++--- source/source.go | 9 +++++++-- source/worker.go | 19 ++++++++++++++----- 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/internal/elasticsearch/api/search.go b/internal/elasticsearch/api/search.go index 26bab86..8d84243 100644 --- a/internal/elasticsearch/api/search.go +++ b/internal/elasticsearch/api/search.go @@ -16,11 +16,11 @@ package api // SearchRequest is the request for calling ElasticSearch api. type SearchRequest struct { - Index string `json:"index"` - Size *int `json:"size:"` - SearchAfter int64 `json:"searchAfter"` - SortBy string `json:"sortBy"` - Order string `json:"order"` + Index string `json:"index"` + Size *int `json:"size:"` + SearchAfter []int64 `json:"searchAfter"` + SortBy string `json:"sortBy"` + Order string `json:"order"` } // SearchResponse is the JSON response from Elasticsearch search query. diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index 2ca183e..502faa8 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -57,7 +57,7 @@ func (c *Client) Search(ctx context.Context, request *api.SearchRequest) (*api.S return response, nil } -func createSearchBody(searchAfter int64) string { +func createSearchBody(searchAfter []int64) string { body := map[string]interface{}{ "query": map[string]interface{}{ "match_all": struct{}{}, @@ -69,8 +69,8 @@ func createSearchBody(searchAfter int64) string { }, } - if searchAfter > 0 { - body["search_after"] = []int64{searchAfter} + if len(searchAfter) == 1 { + body["search_after"] = searchAfter } jsonBody, err := json.Marshal(body) diff --git a/source/source.go b/source/source.go index 2116a24..9b36a75 100644 --- a/source/source.go +++ b/source/source.go @@ -86,10 +86,15 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { for _, index := range s.config.Indexes { s.wg.Add(1) - lastRecordSortID := s.position.IndexPositions[index] + var init bool + lastRecordSortID, ok := s.position.IndexPositions[index] + if !ok { + // read from scratch + init = true + } // a new worker for a new index - NewWorker(ctx, s, index, lastRecordSortID) + NewWorker(ctx, s, index, lastRecordSortID, init) } return nil diff --git a/source/worker.go b/source/worker.go index f64eab4..3c97222 100644 --- a/source/worker.go +++ b/source/worker.go @@ -31,6 +31,7 @@ type Worker struct { sortByField string orderBy string lastRecordSortID int64 + init bool } // NewWorker create a new worker goroutine and starts polling elasticsearch for new records. @@ -39,11 +40,13 @@ func NewWorker( source *Source, index string, lastRecordSortID int64, + init bool, ) { worker := &Worker{ source: source, index: index, lastRecordSortID: lastRecordSortID, + init: init, } go worker.start(ctx) @@ -55,11 +58,15 @@ func (w *Worker) start(ctx context.Context) { for { request := &api.SearchRequest{ - Index: w.index, - Size: &w.source.config.BatchSize, - SearchAfter: w.lastRecordSortID, - SortBy: w.sortByField, - Order: w.orderBy, + Index: w.index, + Size: &w.source.config.BatchSize, + SortBy: w.sortByField, + Order: w.orderBy, + } + if w.init { + request.SearchAfter = []int64{} + } else { + request.SearchAfter = []int64{w.lastRecordSortID} } response, err := w.source.client.Search(ctx, request) @@ -78,6 +85,8 @@ func (w *Worker) start(ctx context.Context) { } } + w.init = false + for _, hit := range response.Hits.Hits { metadata := opencdc.Metadata{ opencdc.MetadataCollection: hit.Index, From 961b6f93a385e5feb4456914ab7edc908188a581 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 25 Oct 2024 15:20:27 +0530 Subject: [PATCH 34/36] fix: used context for shutdown --- source/source.go | 3 --- source/worker.go | 8 ++++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/source/source.go b/source/source.go index 9b36a75..d2dd7e4 100644 --- a/source/source.go +++ b/source/source.go @@ -32,7 +32,6 @@ type Source struct { client elasticsearch.Client position *Position ch chan opencdc.Record - shutdown chan struct{} wg *sync.WaitGroup } @@ -81,7 +80,6 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { } s.ch = make(chan opencdc.Record, s.config.BatchSize) - s.shutdown = make(chan struct{}) s.wg = &sync.WaitGroup{} for _, index := range s.config.Indexes { @@ -125,7 +123,6 @@ func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { // Teardown gracefully shutdown connector. func (s *Source) Teardown(ctx context.Context) error { sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source") - close(s.shutdown) // wait for goroutines to finish s.wg.Wait() // close the read channel for write diff --git a/source/worker.go b/source/worker.go index 3c97222..10d5c9b 100644 --- a/source/worker.go +++ b/source/worker.go @@ -17,7 +17,6 @@ package source import ( "context" "encoding/json" - "log" "time" "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" @@ -72,11 +71,12 @@ func (w *Worker) start(ctx context.Context) { response, err := w.source.client.Search(ctx, request) if err != nil || len(response.Hits.Hits) == 0 { if err != nil { - log.Println("search() err:", err) + sdk.Logger(ctx).Err(err).Msg("worker shutting down...") + return } select { - case <-w.source.shutdown: + case <-ctx.Done(): sdk.Logger(ctx).Debug().Msg("worker shutting down...") return @@ -122,7 +122,7 @@ func (w *Worker) start(ctx context.Context) { case w.source.ch <- record: w.lastRecordSortID = hit.Sort[0] - case <-w.source.shutdown: + case <-ctx.Done(): sdk.Logger(ctx).Debug().Msg("worker shutting down...") return } From 09827c25c2a554e0e300026817ab7af3c5acd490 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Fri, 25 Oct 2024 16:56:27 +0530 Subject: [PATCH 35/36] fix: removed source from worker --- source/source.go | 2 +- source/worker.go | 43 ++++++++++++++++++++++++++++--------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/source/source.go b/source/source.go index d2dd7e4..fe13fd5 100644 --- a/source/source.go +++ b/source/source.go @@ -92,7 +92,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { } // a new worker for a new index - NewWorker(ctx, s, index, lastRecordSortID, init) + NewWorker(ctx, s.client, index, lastRecordSortID, init, s.config.PollingPeriod, s.config.BatchSize, s.wg, s.ch, s.position) } return nil diff --git a/source/worker.go b/source/worker.go index 10d5c9b..efdcd27 100644 --- a/source/worker.go +++ b/source/worker.go @@ -17,35 +17,50 @@ package source import ( "context" "encoding/json" + "sync" "time" + "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch" "github.com/conduitio-labs/conduit-connector-elasticsearch/internal/elasticsearch/api" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" ) type Worker struct { - source *Source + client elasticsearch.Client index string - sortByField string - orderBy string lastRecordSortID int64 init bool + pollingPeriod time.Duration + batchSize int + wg *sync.WaitGroup + ch chan opencdc.Record + position *Position } // NewWorker create a new worker goroutine and starts polling elasticsearch for new records. func NewWorker( ctx context.Context, - source *Source, + client elasticsearch.Client, index string, lastRecordSortID int64, init bool, + pollingPeriod time.Duration, + batchSize int, + wg *sync.WaitGroup, + ch chan opencdc.Record, + position *Position, ) { worker := &Worker{ - source: source, + client: client, index: index, lastRecordSortID: lastRecordSortID, init: init, + pollingPeriod: pollingPeriod, + batchSize: batchSize, + wg: wg, + ch: ch, + position: position, } go worker.start(ctx) @@ -53,14 +68,12 @@ func NewWorker( // start polls elasticsearch for new records and writes it into the source channel. func (w *Worker) start(ctx context.Context) { - defer w.source.wg.Done() + defer w.wg.Done() for { request := &api.SearchRequest{ - Index: w.index, - Size: &w.source.config.BatchSize, - SortBy: w.sortByField, - Order: w.orderBy, + Index: w.index, + Size: &w.batchSize, } if w.init { request.SearchAfter = []int64{} @@ -68,7 +81,7 @@ func (w *Worker) start(ctx context.Context) { request.SearchAfter = []int64{w.lastRecordSortID} } - response, err := w.source.client.Search(ctx, request) + response, err := w.client.Search(ctx, request) if err != nil || len(response.Hits.Hits) == 0 { if err != nil { sdk.Logger(ctx).Err(err).Msg("worker shutting down...") @@ -80,7 +93,7 @@ func (w *Worker) start(ctx context.Context) { sdk.Logger(ctx).Debug().Msg("worker shutting down...") return - case <-time.After(w.source.config.PollingPeriod): + case <-time.After(w.pollingPeriod): continue } } @@ -105,9 +118,9 @@ func (w *Worker) start(ctx context.Context) { continue } - w.source.position.update(hit.Index, hit.Sort[0]) + w.position.update(hit.Index, hit.Sort[0]) - sdkPosition, err := w.source.position.marshal() + sdkPosition, err := w.position.marshal() if err != nil { sdk.Logger(ctx).Err(err).Msg("error marshal position") continue @@ -119,7 +132,7 @@ func (w *Worker) start(ctx context.Context) { record := sdk.Util.Source.NewRecordCreate(sdkPosition, metadata, key, opencdc.RawData(payload)) select { - case w.source.ch <- record: + case w.ch <- record: w.lastRecordSortID = hit.Sort[0] case <-ctx.Done(): From def6c6e879e9ad93a3606b33c1585b403922f8a9 Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Mon, 4 Nov 2024 10:24:42 +0530 Subject: [PATCH 36/36] fix: pr comments --- README.md | 4 +--- internal/elasticsearch/v5/client.go | 2 +- internal/elasticsearch/v6/client.go | 2 +- internal/elasticsearch/v7/client.go | 2 +- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 0ff40bd..a8eda9f 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ ElasticSearch source connector allows you to move data from multiple Elasticsear ## Configuration Options | name | description | required | default | |--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|----------| -| `version` | The version of the Elasticsearch service. One of: `5`, `6`, `7`, `8`. | `true` | | +| `version` | The version of the Elasticsearch service. Currently supports version `8`. | `true` | | | `host` | The Elasticsearch host and port (e.g.: http://127.0.0.1:9200). | `true` | | | `username` | [v: 5, 6, 7, 8] The username for HTTP Basic Authentication. | `false` | | | `password` | [v: 5, 6, 7, 8] The password for HTTP Basic Authentication. | `true` when username was provided, `false` otherwise | | @@ -51,8 +51,6 @@ ElasticSearch source connector allows you to move data from multiple Elasticsear | `serviceToken` | [v: 7, 8] Service token for authorization; if set, overrides username/password. | `false` | | | `certificateFingerprint` | [v: 7, 8] SHA256 hex fingerprint given by Elasticsearch on first launch. | `false` | | | `indexes` | The name of the indexes to read the data from. | `true` | | -| `indexes.*.sortBy` | The sortby field for each index to be used by elasticsearch search api. | `true` | | -| `indexes.*.sortOrder` | The sortOrder (asc or desc) for each index to be used by elasticsearch search api. | `true` | | | `batchSize` | The number of items to fetch from an index. The minimum value is `1`, maximum value is `10000`. | `false` | `"1000"` | | `pollingPeriod` | The duration for polling the search api for fetching new records. | `false` | `"5s"` | diff --git a/internal/elasticsearch/v5/client.go b/internal/elasticsearch/v5/client.go index 86728e9..11efc20 100644 --- a/internal/elasticsearch/v5/client.go +++ b/internal/elasticsearch/v5/client.go @@ -165,5 +165,5 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { // Search calls the elasticsearch search api and retuns SearchResponse read from an index. func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api - return nil, nil //nolint:nilnil // implementation to be done + return nil, fmt.Errorf("method not implemented") } diff --git a/internal/elasticsearch/v6/client.go b/internal/elasticsearch/v6/client.go index b75abc9..66c4ebc 100644 --- a/internal/elasticsearch/v6/client.go +++ b/internal/elasticsearch/v6/client.go @@ -168,5 +168,5 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { // Search calls the elasticsearch search api and retuns SearchResponse read from an index. func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api - return nil, nil //nolint:nilnil // implementation to be done + return nil, fmt.Errorf("method not implemented") } diff --git a/internal/elasticsearch/v7/client.go b/internal/elasticsearch/v7/client.go index ad02b66..16ba174 100644 --- a/internal/elasticsearch/v7/client.go +++ b/internal/elasticsearch/v7/client.go @@ -167,5 +167,5 @@ func preparePayload(item *opencdc.Record) (json.RawMessage, error) { // Search calls the elasticsearch search api and retuns SearchResponse read from an index. func (c *Client) Search(_ context.Context, _ *api.SearchRequest) (*api.SearchResponse, error) { // TODO: implement elasticsearch search api - return nil, nil //nolint:nilnil // implementation to be done + return nil, fmt.Errorf("method not implemented") }