Skip to content

Commit

Permalink
Distinquish between delete and upsert operations
Browse files Browse the repository at this point in the history
  • Loading branch information
donatorsky committed May 4, 2022
1 parent 50fde35 commit e33d61a
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 106 deletions.
6 changes: 3 additions & 3 deletions cmd/es/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/miquido/conduit-connector-elasticsearch"
"github.com/miquido/conduit-connector-elasticsearch/destination"
es "github.com/miquido/conduit-connector-elasticsearch"
esDestination "github.com/miquido/conduit-connector-elasticsearch/destination"
)

func main() {
sdk.Serve(elasticsearch.Specification, nil, destination.NewDestination)
sdk.Serve(es.Specification, nil, esDestination.NewDestination)
}
279 changes: 185 additions & 94 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,137 +81,225 @@ func (d *Destination) Flush(ctx context.Context) error {
}

// Prepare request payload
var data bytes.Buffer
data := &bytes.Buffer{}

for _, item := range d.recordsBuffer {
key := string(item.Key.Bytes())

// actionAndMetadata
entryMetadata, err := json.Marshal(actionAndMetadata{
Update: struct {
ID string `json:"_id"`
Index string `json:"_index"`
RetryOnConflict int `json:"retry_on_conflict"`
}{
ID: key,
Index: d.config.Index,
RetryOnConflict: 3,
},
})
if err != nil {
return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err)
}
switch action := item.Metadata["action"]; action {
case "created", "updated":
if err := d.writeUpsertOperation(key, data, item); err != nil {
return err
}

data.Write(entryMetadata)
data.WriteRune('\n')
case "deleted":
if err := d.writeDeleteOperation(key, data); err != nil {
return err
}

// optionalSource
sourcePayload := optionalSource{
DocAsUpsert: true,
default:
sdk.Logger(ctx).Warn().Msgf("unsupported action: %+v", action)

continue
}
}

switch itemPayload := item.Payload.(type) {
case sdk.StructuredData:
// Payload is potentially convertable into JSON
itemPayloadMarshalled, err := json.Marshal(itemPayload)
if err != nil {
return fmt.Errorf("failed to prepare data with key=%s: %w", key, err)
}
// Send the bulk request
response, err := d.executeBulkRequest(ctx, data)
if err != nil {
return err
}

for _, item := range response.Items {
var itemResponse bulkResponseItem

switch {
case item.Update != nil:
itemResponse = *item.Update

sourcePayload.Doc = itemPayloadMarshalled
case item.Delete != nil:
itemResponse = *item.Delete

default:
// Nothing more can be done, we can trust the source to provide valid JSON
sourcePayload.Doc = itemPayload.Bytes()
sdk.Logger(ctx).Warn().Msg("no update or delete details were found in Elasticsearch response")

continue
}

entrySource, err := json.Marshal(sourcePayload)
if err := d.sendAckForOperation(itemResponse); err != nil {
return err
}
}

// Reset buffers
d.recordsBuffer = d.recordsBuffer[:0]
d.ackFuncsBuffer = make(map[string]sdk.AckFunc, d.config.BulkSize)

return nil
}

func (d *Destination) Teardown(context.Context) error {
return nil // No close routine needed
}

func (d *Destination) writeDeleteOperation(key string, data *bytes.Buffer) error {
// actionAndMetadata
entryMetadata, err := json.Marshal(actionAndMetadata{
Delete: &deleteAction{
ID: key,
Index: d.config.Index,
},
})
if err != nil {
return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err)
}

data.Write(entryMetadata)
data.WriteRune('\n')
return nil
}

func (d *Destination) writeUpsertOperation(key string, data *bytes.Buffer, item sdk.Record) error {
// actionAndMetadata
entryMetadata, err := json.Marshal(actionAndMetadata{
Update: &updateAction{
ID: key,
Index: d.config.Index,
RetryOnConflict: 3,
},
})
if err != nil {
return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err)
}

data.Write(entryMetadata)
data.WriteRune('\n')

// optionalSource
sourcePayload := optionalSource{
DocAsUpsert: true,
}

switch itemPayload := item.Payload.(type) {
case sdk.StructuredData:
// Payload is potentially convertable into JSON
itemPayloadMarshalled, err := json.Marshal(itemPayload)
if err != nil {
return fmt.Errorf("failed to prepare data with key=%s: %w", key, err)
}

data.Write(entrySource)
data.WriteRune('\n')
sourcePayload.Doc = itemPayloadMarshalled

default:
// Nothing more can be done, we can trust the source to provide valid JSON
sourcePayload.Doc = itemPayload.Bytes()
}

fmt.Println(data.String())
entrySource, err := json.Marshal(sourcePayload)
if err != nil {
return fmt.Errorf("failed to prepare data with key=%s: %w", key, err)
}

data.Write(entrySource)
data.WriteRune('\n')
return nil
}

func (d *Destination) executeBulkRequest(ctx context.Context, data *bytes.Buffer) (bulkResponse, error) {
if data.Len() < 1 {
sdk.Logger(ctx).Info().Msg("no operations to execute in bulk, skipping")

return bulkResponse{}, nil
}

defer data.Reset()

// Send the bulk request
result, err := d.client.Bulk(bytes.NewReader(data.Bytes()), d.client.Bulk.WithContext(ctx))
if err != nil {
return fmt.Errorf("bulk request failure: %w", err)
return bulkResponse{}, fmt.Errorf("bulk request failure: %w", err)
}
if result.IsError() {
bodyContents, err := io.ReadAll(result.Body)
if err != nil {
return fmt.Errorf("bulk request failure: failed to read the result: %w", err)
return bulkResponse{}, fmt.Errorf("bulk request failure: failed to read the result: %w", err)
}
defer result.Body.Close()

var errorDetails genericError
if err := json.Unmarshal(bodyContents, &errorDetails); err != nil {
return fmt.Errorf("bulk request failure: %s", result.Status())
return bulkResponse{}, fmt.Errorf("bulk request failure: %s", result.Status())
}

return fmt.Errorf("bulk request failure: %s: %s", errorDetails.Error.Type, errorDetails.Error.Reason)
return bulkResponse{}, fmt.Errorf("bulk request failure: %s: %s", errorDetails.Error.Type, errorDetails.Error.Reason)
}

bodyContents, err := io.ReadAll(result.Body)
if err != nil {
return fmt.Errorf("bulk response failure: failed to read the result: %w", err)
return bulkResponse{}, fmt.Errorf("bulk response failure: failed to read the result: %w", err)
}
defer result.Body.Close()

fmt.Println(string(bodyContents))

// Read individual errors
var response bulkResponse
if err := json.Unmarshal(bodyContents, &response); err != nil {
return fmt.Errorf("bulk response failure: could not read the response: %w", err)
return bulkResponse{}, fmt.Errorf("bulk response failure: could not read the response: %w", err)
}

for _, item := range response.Items {
ackFunc, exists := d.ackFuncsBuffer[item.Update.ID]
if !exists {
return fmt.Errorf("bulk response failure: could not ack item with key=%s: no ack function was registered", item.Update.ID)
}

if item.Update.Status >= 200 && item.Update.Status < 300 {
if err := ackFunc(nil); err != nil {
return err
}
return response, nil
}

continue
}
func (d *Destination) sendAckForOperation(itemResponse bulkResponseItem) error {
ackFunc, exists := d.ackFuncsBuffer[itemResponse.ID]
if !exists {
return fmt.Errorf("bulk response failure: could not ack item with key=%s: no ack function was registered", itemResponse.ID)
}

if err := ackFunc(fmt.Errorf(
"item with key=%s upsert failure: [%s] %s: %s",
item.Update.ID,
item.Update.Error.Type,
item.Update.Error.Reason,
item.Update.Error.CausedBy,
)); err != nil {
if itemResponse.Status >= 200 && itemResponse.Status < 300 {
if err := ackFunc(nil); err != nil {
return err
}

return nil
}

// Reset buffers
d.recordsBuffer = d.recordsBuffer[:0]
d.ackFuncsBuffer = make(map[string]sdk.AckFunc, d.config.BulkSize)
var operationError error

if itemResponse.Error == nil {
operationError = fmt.Errorf(
"item with key=%s upsert/delete failure: unknown error",
itemResponse.ID,
)
} else {
operationError = fmt.Errorf(
"item with key=%s upsert/delete failure: [%s] %s: %s",
itemResponse.ID,
itemResponse.Error.Type,
itemResponse.Error.Reason,
itemResponse.Error.CausedBy,
)
}

if err := ackFunc(operationError); err != nil {
return err
}

return nil
}

func (d *Destination) Teardown(context.Context) error {
return nil // No close routine needed
type actionAndMetadata struct {
Update *updateAction `json:"update,omitempty"`
Delete *deleteAction `json:"delete,omitempty"`
}

type actionAndMetadata struct {
Update struct {
ID string `json:"_id"`
Index string `json:"_index"`
RetryOnConflict int `json:"retry_on_conflict"`
} `json:"update"`
type updateAction struct {
ID string `json:"_id"`
Index string `json:"_index"`
RetryOnConflict int `json:"retry_on_conflict"`
}

type deleteAction struct {
ID string `json:"_id"`
Index string `json:"_index"`
}

type optionalSource struct {
Expand All @@ -223,29 +311,32 @@ type bulkResponse struct {
Took int `json:"took"`
Errors bool `json:"errors"`
Items []struct {
Update struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Version int `json:"_version,omitempty"`
Result string `json:"result,omitempty"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Failed int `json:"failed"`
} `json:"_shards,omitempty"`
SeqNo int `json:"_seq_no,omitempty"`
PrimaryTerm int `json:"_primary_term,omitempty"`
Status int `json:"status"`
Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
CausedBy json.RawMessage `json:"caused_by"`
} `json:"error,omitempty"`
} `json:"update"`
Update *bulkResponseItem `json:"update,omitempty"`
Delete *bulkResponseItem `json:"delete,omitempty"`
} `json:"items"`
}

type bulkResponseItem struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Version int `json:"_version,omitempty"`
Result string `json:"result,omitempty"`
Shards *struct {
Total int `json:"total"`
Successful int `json:"successful"`
Failed int `json:"failed"`
} `json:"_shards,omitempty"`
SeqNo int `json:"_seq_no,omitempty"`
PrimaryTerm int `json:"_primary_term,omitempty"`
Status int `json:"status"`
Error *struct {
Type string `json:"type"`
Reason string `json:"reason"`
CausedBy json.RawMessage `json:"caused_by"`
} `json:"error,omitempty"`
}

type genericError struct {
Error struct {
Type string `json:"type"`
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ require (
github.com/rs/zerolog v1.26.1 // indirect
go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 // indirect
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
Expand Down
Loading

0 comments on commit e33d61a

Please sign in to comment.