Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(destination): add destination connector #1

Merged
merged 54 commits into from
Jun 8, 2022

Conversation

donatorsky
Copy link
Contributor

Description

Adds destination handling to Elasticsearch connector.

@donatorsky donatorsky added the enhancement New feature or request label Apr 28, 2022
@donatorsky donatorsky self-assigned this Apr 28, 2022
@donatorsky donatorsky force-pushed the feat/destination-connector branch 4 times, most recently from bdacc7c to e33d61a Compare May 4, 2022 14:22
@donatorsky donatorsky marked this pull request as draft May 5, 2022 12:13
@donatorsky donatorsky added the good first issue Good for newcomers label May 6, 2022
@donatorsky donatorsky marked this pull request as ready for review May 6, 2022 14:20
@donatorsky donatorsky force-pushed the feat/destination-connector branch 4 times, most recently from 2c75a34 to 2e3be3f Compare May 16, 2022 11:00
@donatorsky donatorsky force-pushed the feat/destination-connector branch from c3d333c to 26c3287 Compare May 17, 2022 10:33
Copy link
Contributor

@hariso hariso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

test/v5/connector_test.go Show resolved Hide resolved

func ParseConfig(cfgRaw map[string]string) (_ Config, err error) {
cfg := Config{
Version: cfgRaw[ConfigKeyVersion],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that ES itself reports its version (when you go to localhost:9200 for example), do we need the user to manually enter this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do, since it is not always available immediately. The server can require the client to be authorized first. In such case, I would need to use some version of the client, see if it connects, get the version and then use the proper version of the client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. However, the request to localhost:9200 is like any other request. It can fail only if the config is missing credentials, in which case the connector is not able to work anyway. In other words, we have two possibilties:

  1. ES server requires auth, but the connector config doesn't have it: We will ping localhost:9200, get a 401 code back, and will let the user know that the config is invalid (missing creds).
  2. ES doesn't require auth, the connector config doesn't have it: all good
  3. ES requires auth, connector config has it: pinging the host to get the version will work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming e.g. ES SDK v8 (and future versions) can authorize ES server v5 (or any previous version in general). A similar thing happened to MySQL connector for PHP after MySQL 8.0 was released, and they changed the default auth plugin (https://stackoverflow.com/questions/49083573/php-7-2-2-mysql-8-0-pdo-gives-authentication-method-unknown-to-the-client-ca) which lead to kind of false-positive where user+password are correct, but additional configuration parameter is required.

However, I think we are good to go with this. I see it currently as a rare case, since ES is pretty much robust with their changes (they either do not change already well working features or announce changes early enough). I've created an issue for that #4.

destination/destination.go Show resolved Hide resolved
Comment on lines 110 to 120
// Sort operations to ensure the order
d.operationsQueue.Sort()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that you thought about this case. I have a question: the purpose of this sort is to make sure that operations come in order (e.g. that we don't have a delete before a create). However, even this sort cannot guarantee that since it's local. We may see out-of-order records coming in the batch before this one or after this one. For example:
batch 1: record deleted at 10 AM
batch 2: record created at 9 AM
Simply changing the batch size changes the effect of this sort. And since we can't guarantee the effect of it, maybe we shouldn't do it. WDYT?

Copy link
Contributor Author

@donatorsky donatorsky May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, is to ensure the order of events as much as it is possible, but still there may be an edge case when two consecutive events were sent, but the second got earlier, filled the buffer and the change was applied.

I'm afraid there is not much to do about this. If we remove sorting logic, then we can get a much bigger mess where the situation You described happens more often. Increasing the buffer size and sorting minimizes this, on average. We can also drop last n records when applying changes and save them for the next batch, but we don't know if the missing record is last or first.
A possible "solution" (workaround, but still not perfect) for that is to implement in the Conduit something similar to TCP/IP packets, where they are numbered and when client gets response no.1 and no.3, still waits for no.2 (e.g. some BatchID attached to Record). But even then:

  • There is no guarantee that no.2 will ever come, and a pipeline will be stuck. Timeouts, if added, will become a real pain when this happens frequently.
  • There is no guarantee that source connector received data in order. It would have to be implemented by actual source.

The order does not always matter. When Record.Key is not set, then records are always created. This also solves it for certain use cases. For that, I think it would be useful to have a dedicated config "Always create" to disable upsert logic event when Record.Key is available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that you're saying. Let's add a comment to that line above, briefly explaining why we do that. If we ever start seeing the mentioned edge cases or performance degradation, we can work on it then, it's probably too early to work on it now anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why are we even concerned about records arriving out of order? Conduit guarantees message order, so unless the source produces the records out of order I don't see why we would need to sort them. Even if that is the case, it sounds like a bug that needs to be fixed by the source and not something the destination should address.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donatorsky IMHO the point @lovromazgon made is a good one. Given that the sort is running always (so there's always some performance penalty), but that it's not a 100% effective solution (due to batches) and that another condition is that sources messed something up (because as Lovro said, Conduit itself guarantees message order), it looks like we shouldn't be doing it. What are your thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your example you say that those two events were sent at those times. With that, you mean issued by Salesforce?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, issued by the Salesforce, and they have these dates in the payload:
1st: Sent at 15:30:01 with payload {"created_at":"yyyy-mm-dd 15:30:01"}, received 15:30:12.
2nd: Sent at 15:30:02 with payload {"created_at":"yyyy-mm-dd 15:30:02"}, received 15:30:11.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you are saying that the Salesforce source connector can create records in an order that is different from the order of events as they hapoened in Salesforce. This sounds like a bug in the source connector to me, definitely not something the destination should care about, otherwise we would need to include this logic in each and every destination connector, just in case.

You are saying that Conduit manages the order of messages received (i.e. HTTP requests), but not the actual order of messages specified in the payload.

What I am trying to say is that the source connector is completely in charge of creating the records in whichever order it chooses. Conduit will make sure that the order remains the same all the way to the destination, so if the source connector produces record A and then record B, the destination will receive record A and then record B.

To sum up, the source connector has more information about the records it creates than any other component in the pipeline, so it is the best place to decide the correct order of records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 If, for any reason, sorting is needed, then the source connector sounds like a better place for that. Plus, sorting will be done even when it's not needed, which affects performance (even if small) for no reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated: d7b7145

internal/elasticsearch/factory_test.go Show resolved Hide resolved
@conduitio-labs conduitio-labs deleted a comment from hariso May 20, 2022
@conduitio-labs conduitio-labs deleted a comment from hariso May 20, 2022
@conduitio-labs conduitio-labs deleted a comment from hariso May 20, 2022
@conduitio-labs conduitio-labs deleted a comment from hariso May 20, 2022
@donatorsky
Copy link
Contributor Author

@hariso Some comments appeared twice, I removed duplicates 🙂

@hariso
Copy link
Contributor

hariso commented May 20, 2022

@hariso Some comments appeared twice, I removed duplicates slightly_smiling_face

😕

README.md Outdated Show resolved Hide resolved
@donatorsky donatorsky force-pushed the feat/destination-connector branch from f50e26f to 737256c Compare May 31, 2022 07:25
)
} else {
d.operationsQueue[n].err = fmt.Errorf(
"item with key=%s create/upsert/delete failure: [%s] %s: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to know which operation exactly was it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can get it from the switch statement a few lines earlier. Will add this 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet 👍

// Execute operations
retriesLeft := d.config.Retries

for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for loop is for retries, right?

Not a blocker for this PR, but it would nice to split this whole method into smaller ones, and make the retry logic stand out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #7 for that

Comment on lines 44 to 49
actionCreate = "create"
actionCreated = "created"
actionUpdate = "update"
actionUpdated = "updated"
actionDelete = "delete"
actionDeleted = "deleted"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both flavors (created and created, etc.)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not to be standardized, so I wanted to support as many reasonable possibilities as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean, not standardized in the ES API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these are values from the Record's metadata["action"] field. So it actually depends on creators of source connectors 😉 I've seen you use action=delete for S3 and insert, update, delete for Postgres while Salesforce uses created, updated etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. We're working on getting standardized through OpenCDC. So it would be good to mention the reason for the duplication in a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donatorsky Is there a special reason why the Salesforce connector uses the "d" version? The S3 and Pg connectors are in line with the OpenCDC guidelines here: https://github.com/ConduitIO/conduit/blob/main/docs/design-documents/20220309-opencdc.md?plain=1#L267-L272

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because I get "d" version directly from the Salesforce in query response.

From this PR point of view, it seems I should drop "d" versions, go with OpenCDC guidelines and update Salesforce connector to translate SF ⇒ OpenCDC actions. Should I proceed with these steps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donatorsky That's right, I'd go with that. Internally, connectors are free to do what they want, but the input and output should match the Conduit APIs and the OpenCDC guidelines (which are work in progress obviously and will be refined over time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated: 9fe58b3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 34 to 41
// PrepareCreateOperation prepares insert operation definition
PrepareCreateOperation(item sdk.Record) (metadata interface{}, payload interface{}, err error)

// PrepareUpsertOperation prepares upsert operation definition
PrepareUpsertOperation(key string, item sdk.Record) (metadata interface{}, payload interface{}, err error)

// PrepareDeleteOperation prepares delete operation definition
PrepareDeleteOperation(key string) (metadata interface{}, err error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are bulk operations right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These create a single change definition for bulk query. The bulk query itself is composed of these later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I guessed so. It would be nice to have it in the name, or the comment at the very least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Underlying implementation differs between ES versions, same reason as for here:
#1 (comment)

It is definitely missing comment that it is for bulk, despite the fact that only bulk operation is supported by the connector. I'm adding this 🙂

test/v5/connector_test.go Show resolved Hide resolved
// See the License for the specific language governing permissions and
// limitations under the License.

package v7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a difference between the requests (and responses) across different versions, especially v5-v7?

If there are no changes, it looks like we can take advantage of that, and simplify the client code. v8 is the one being actively maintained, so if v5-v7 are the same, they will stay the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are different, unfortunately :/ v6 introduces RetryOnConflict int for update action and v7 removes support for indices' types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad!

spec.go Outdated
destination.ConfigKeyVersion: {
Default: "",
Required: true,
Description: "The version of the Elasticsearch service.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to mention what are the possible values (and in which format, e.g. is it 8 or v8). It is mentioned in the ReadMe (which is great!) but these specs will be used to automatically populate options in a UI widget.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, added 🙂

version: '3.9'

services:
kibana:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this Kibana instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is strictly for local development. I was going to exclude docker-compose*.overrides.yml but decided to leave it, maybe it will help someone in the future.

When running tests, Kibana is not started. Only locally, when explicitly defined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice thing, so maybe you can mention it in the ReadMe (i.e. how to start Kibana for local dev).

Copy link
Contributor

@hariso hariso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we're good to go with this PR, thanks all of the good work! I can't find all the comments, but three things are outstanding:

  1. Comment about actions (created, create etc.)
  2. Sorting records
  3. Splitting that method (with the retry logic)

@donatorsky donatorsky merged commit a7bec5e into main Jun 8, 2022
@donatorsky donatorsky deleted the feat/destination-connector branch June 8, 2022 08:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants