From e33d61a9f1ea21de2158b0fda14634723a49725d Mon Sep 17 00:00:00 2001 From: Maciej Kudas Date: Wed, 4 May 2022 13:52:10 +0200 Subject: [PATCH] Distinquish between delete and upsert operations --- cmd/es/main.go | 6 +- destination/destination.go | 279 ++++++++++++++++++++++++------------- go.mod | 4 +- go.sum | 11 +- 4 files changed, 194 insertions(+), 106 deletions(-) diff --git a/cmd/es/main.go b/cmd/es/main.go index b1756e6..def534b 100644 --- a/cmd/es/main.go +++ b/cmd/es/main.go @@ -2,10 +2,10 @@ package main import ( sdk "github.com/conduitio/conduit-connector-sdk" - "github.com/miquido/conduit-connector-elasticsearch" - "github.com/miquido/conduit-connector-elasticsearch/destination" + es "github.com/miquido/conduit-connector-elasticsearch" + esDestination "github.com/miquido/conduit-connector-elasticsearch/destination" ) func main() { - sdk.Serve(elasticsearch.Specification, nil, destination.NewDestination) + sdk.Serve(es.Specification, nil, esDestination.NewDestination) } diff --git a/destination/destination.go b/destination/destination.go index 89490ff..77910d8 100644 --- a/destination/destination.go +++ b/destination/destination.go @@ -81,137 +81,225 @@ func (d *Destination) Flush(ctx context.Context) error { } // Prepare request payload - var data bytes.Buffer + data := &bytes.Buffer{} for _, item := range d.recordsBuffer { key := string(item.Key.Bytes()) - // actionAndMetadata - entryMetadata, err := json.Marshal(actionAndMetadata{ - Update: struct { - ID string `json:"_id"` - Index string `json:"_index"` - RetryOnConflict int `json:"retry_on_conflict"` - }{ - ID: key, - Index: d.config.Index, - RetryOnConflict: 3, - }, - }) - if err != nil { - return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err) - } + switch action := item.Metadata["action"]; action { + case "created", "updated": + if err := d.writeUpsertOperation(key, data, item); err != nil { + return err + } - data.Write(entryMetadata) - data.WriteRune('\n') + case "deleted": + if err := d.writeDeleteOperation(key, data); err != nil { + return err + } - // optionalSource - sourcePayload := optionalSource{ - DocAsUpsert: true, + default: + sdk.Logger(ctx).Warn().Msgf("unsupported action: %+v", action) + + continue } + } - switch itemPayload := item.Payload.(type) { - case sdk.StructuredData: - // Payload is potentially convertable into JSON - itemPayloadMarshalled, err := json.Marshal(itemPayload) - if err != nil { - return fmt.Errorf("failed to prepare data with key=%s: %w", key, err) - } + // Send the bulk request + response, err := d.executeBulkRequest(ctx, data) + if err != nil { + return err + } + + for _, item := range response.Items { + var itemResponse bulkResponseItem + + switch { + case item.Update != nil: + itemResponse = *item.Update - sourcePayload.Doc = itemPayloadMarshalled + case item.Delete != nil: + itemResponse = *item.Delete default: - // Nothing more can be done, we can trust the source to provide valid JSON - sourcePayload.Doc = itemPayload.Bytes() + sdk.Logger(ctx).Warn().Msg("no update or delete details were found in Elasticsearch response") + + continue } - entrySource, err := json.Marshal(sourcePayload) + if err := d.sendAckForOperation(itemResponse); err != nil { + return err + } + } + + // Reset buffers + d.recordsBuffer = d.recordsBuffer[:0] + d.ackFuncsBuffer = make(map[string]sdk.AckFunc, d.config.BulkSize) + + return nil +} + +func (d *Destination) Teardown(context.Context) error { + return nil // No close routine needed +} + +func (d *Destination) writeDeleteOperation(key string, data *bytes.Buffer) error { + // actionAndMetadata + entryMetadata, err := json.Marshal(actionAndMetadata{ + Delete: &deleteAction{ + ID: key, + Index: d.config.Index, + }, + }) + if err != nil { + return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err) + } + + data.Write(entryMetadata) + data.WriteRune('\n') + return nil +} + +func (d *Destination) writeUpsertOperation(key string, data *bytes.Buffer, item sdk.Record) error { + // actionAndMetadata + entryMetadata, err := json.Marshal(actionAndMetadata{ + Update: &updateAction{ + ID: key, + Index: d.config.Index, + RetryOnConflict: 3, + }, + }) + if err != nil { + return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err) + } + + data.Write(entryMetadata) + data.WriteRune('\n') + + // optionalSource + sourcePayload := optionalSource{ + DocAsUpsert: true, + } + + switch itemPayload := item.Payload.(type) { + case sdk.StructuredData: + // Payload is potentially convertable into JSON + itemPayloadMarshalled, err := json.Marshal(itemPayload) if err != nil { return fmt.Errorf("failed to prepare data with key=%s: %w", key, err) } - data.Write(entrySource) - data.WriteRune('\n') + sourcePayload.Doc = itemPayloadMarshalled + + default: + // Nothing more can be done, we can trust the source to provide valid JSON + sourcePayload.Doc = itemPayload.Bytes() } - fmt.Println(data.String()) + entrySource, err := json.Marshal(sourcePayload) + if err != nil { + return fmt.Errorf("failed to prepare data with key=%s: %w", key, err) + } + + data.Write(entrySource) + data.WriteRune('\n') + return nil +} + +func (d *Destination) executeBulkRequest(ctx context.Context, data *bytes.Buffer) (bulkResponse, error) { + if data.Len() < 1 { + sdk.Logger(ctx).Info().Msg("no operations to execute in bulk, skipping") + + return bulkResponse{}, nil + } + + defer data.Reset() - // Send the bulk request result, err := d.client.Bulk(bytes.NewReader(data.Bytes()), d.client.Bulk.WithContext(ctx)) if err != nil { - return fmt.Errorf("bulk request failure: %w", err) + return bulkResponse{}, fmt.Errorf("bulk request failure: %w", err) } if result.IsError() { bodyContents, err := io.ReadAll(result.Body) if err != nil { - return fmt.Errorf("bulk request failure: failed to read the result: %w", err) + return bulkResponse{}, fmt.Errorf("bulk request failure: failed to read the result: %w", err) } defer result.Body.Close() var errorDetails genericError if err := json.Unmarshal(bodyContents, &errorDetails); err != nil { - return fmt.Errorf("bulk request failure: %s", result.Status()) + return bulkResponse{}, fmt.Errorf("bulk request failure: %s", result.Status()) } - return fmt.Errorf("bulk request failure: %s: %s", errorDetails.Error.Type, errorDetails.Error.Reason) + return bulkResponse{}, fmt.Errorf("bulk request failure: %s: %s", errorDetails.Error.Type, errorDetails.Error.Reason) } bodyContents, err := io.ReadAll(result.Body) if err != nil { - return fmt.Errorf("bulk response failure: failed to read the result: %w", err) + return bulkResponse{}, fmt.Errorf("bulk response failure: failed to read the result: %w", err) } defer result.Body.Close() - fmt.Println(string(bodyContents)) - // Read individual errors var response bulkResponse if err := json.Unmarshal(bodyContents, &response); err != nil { - return fmt.Errorf("bulk response failure: could not read the response: %w", err) + return bulkResponse{}, fmt.Errorf("bulk response failure: could not read the response: %w", err) } - for _, item := range response.Items { - ackFunc, exists := d.ackFuncsBuffer[item.Update.ID] - if !exists { - return fmt.Errorf("bulk response failure: could not ack item with key=%s: no ack function was registered", item.Update.ID) - } - - if item.Update.Status >= 200 && item.Update.Status < 300 { - if err := ackFunc(nil); err != nil { - return err - } + return response, nil +} - continue - } +func (d *Destination) sendAckForOperation(itemResponse bulkResponseItem) error { + ackFunc, exists := d.ackFuncsBuffer[itemResponse.ID] + if !exists { + return fmt.Errorf("bulk response failure: could not ack item with key=%s: no ack function was registered", itemResponse.ID) + } - if err := ackFunc(fmt.Errorf( - "item with key=%s upsert failure: [%s] %s: %s", - item.Update.ID, - item.Update.Error.Type, - item.Update.Error.Reason, - item.Update.Error.CausedBy, - )); err != nil { + if itemResponse.Status >= 200 && itemResponse.Status < 300 { + if err := ackFunc(nil); err != nil { return err } + + return nil } - // Reset buffers - d.recordsBuffer = d.recordsBuffer[:0] - d.ackFuncsBuffer = make(map[string]sdk.AckFunc, d.config.BulkSize) + var operationError error + + if itemResponse.Error == nil { + operationError = fmt.Errorf( + "item with key=%s upsert/delete failure: unknown error", + itemResponse.ID, + ) + } else { + operationError = fmt.Errorf( + "item with key=%s upsert/delete failure: [%s] %s: %s", + itemResponse.ID, + itemResponse.Error.Type, + itemResponse.Error.Reason, + itemResponse.Error.CausedBy, + ) + } + + if err := ackFunc(operationError); err != nil { + return err + } return nil } -func (d *Destination) Teardown(context.Context) error { - return nil // No close routine needed +type actionAndMetadata struct { + Update *updateAction `json:"update,omitempty"` + Delete *deleteAction `json:"delete,omitempty"` } -type actionAndMetadata struct { - Update struct { - ID string `json:"_id"` - Index string `json:"_index"` - RetryOnConflict int `json:"retry_on_conflict"` - } `json:"update"` +type updateAction struct { + ID string `json:"_id"` + Index string `json:"_index"` + RetryOnConflict int `json:"retry_on_conflict"` +} + +type deleteAction struct { + ID string `json:"_id"` + Index string `json:"_index"` } type optionalSource struct { @@ -223,29 +311,32 @@ type bulkResponse struct { Took int `json:"took"` Errors bool `json:"errors"` Items []struct { - Update struct { - Index string `json:"_index"` - Type string `json:"_type"` - ID string `json:"_id"` - Version int `json:"_version,omitempty"` - Result string `json:"result,omitempty"` - Shards struct { - Total int `json:"total"` - Successful int `json:"successful"` - Failed int `json:"failed"` - } `json:"_shards,omitempty"` - SeqNo int `json:"_seq_no,omitempty"` - PrimaryTerm int `json:"_primary_term,omitempty"` - Status int `json:"status"` - Error struct { - Type string `json:"type"` - Reason string `json:"reason"` - CausedBy json.RawMessage `json:"caused_by"` - } `json:"error,omitempty"` - } `json:"update"` + Update *bulkResponseItem `json:"update,omitempty"` + Delete *bulkResponseItem `json:"delete,omitempty"` } `json:"items"` } +type bulkResponseItem struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + Version int `json:"_version,omitempty"` + Result string `json:"result,omitempty"` + Shards *struct { + Total int `json:"total"` + Successful int `json:"successful"` + Failed int `json:"failed"` + } `json:"_shards,omitempty"` + SeqNo int `json:"_seq_no,omitempty"` + PrimaryTerm int `json:"_primary_term,omitempty"` + Status int `json:"status"` + Error *struct { + Type string `json:"type"` + Reason string `json:"reason"` + CausedBy json.RawMessage `json:"caused_by"` + } `json:"error,omitempty"` +} + type genericError struct { Error struct { Type string `json:"type"` diff --git a/go.mod b/go.mod index 3ba663d..5874a22 100644 --- a/go.mod +++ b/go.mod @@ -23,9 +23,9 @@ require ( github.com/rs/zerolog v1.26.1 // indirect go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 // indirect golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect - golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect + golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 // indirect + google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 // indirect google.golang.org/grpc v1.46.0 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect diff --git a/go.sum b/go.sum index 21855e1..c1639d4 100644 --- a/go.sum +++ b/go.sum @@ -9,7 +9,6 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= @@ -28,7 +27,6 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= -github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -161,8 +159,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60= +golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -190,8 +188,8 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 h1:G1IeWbjrqEq9ChWxEuRPJu6laA67+XgTFHVSAvepr38= -google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= +google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 h1:DJUvgAPiJWeMBiT+RzBVcJGQN7bAEWS5UEoMshES9xs= +google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= @@ -201,7 +199,6 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= -google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.46.0 h1:oCjezcn6g6A75TGoKYBPgKmVBLexhYLM6MebdrPApP8= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=