-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(destination): add destination connector #1
Conversation
bdacc7c
to
e33d61a
Compare
2c75a34
to
2e3be3f
Compare
c3d333c
to
26c3287
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
|
||
func ParseConfig(cfgRaw map[string]string) (_ Config, err error) { | ||
cfg := Config{ | ||
Version: cfgRaw[ConfigKeyVersion], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that ES itself reports its version (when you go to localhost:9200 for example), do we need the user to manually enter this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do, since it is not always available immediately. The server can require the client to be authorized first. In such case, I would need to use some version of the client, see if it connects, get the version and then use the proper version of the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. However, the request to localhost:9200 is like any other request. It can fail only if the config is missing credentials, in which case the connector is not able to work anyway. In other words, we have two possibilties:
- ES server requires auth, but the connector config doesn't have it: We will ping localhost:9200, get a 401 code back, and will let the user know that the config is invalid (missing creds).
- ES doesn't require auth, the connector config doesn't have it: all good
- ES requires auth, connector config has it: pinging the host to get the version will work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming e.g. ES SDK v8 (and future versions) can authorize ES server v5 (or any previous version in general). A similar thing happened to MySQL connector for PHP after MySQL 8.0 was released, and they changed the default auth plugin (https://stackoverflow.com/questions/49083573/php-7-2-2-mysql-8-0-pdo-gives-authentication-method-unknown-to-the-client-ca) which lead to kind of false-positive where user+password are correct, but additional configuration parameter is required.
However, I think we are good to go with this. I see it currently as a rare case, since ES is pretty much robust with their changes (they either do not change already well working features or announce changes early enough). I've created an issue for that #4.
destination/destination.go
Outdated
// Sort operations to ensure the order | ||
d.operationsQueue.Sort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's great that you thought about this case. I have a question: the purpose of this sort is to make sure that operations come in order (e.g. that we don't have a delete before a create). However, even this sort cannot guarantee that since it's local. We may see out-of-order records coming in the batch before this one or after this one. For example:
batch 1: record deleted at 10 AM
batch 2: record created at 9 AM
Simply changing the batch size changes the effect of this sort. And since we can't guarantee the effect of it, maybe we shouldn't do it. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, is to ensure the order of events as much as it is possible, but still there may be an edge case when two consecutive events were sent, but the second got earlier, filled the buffer and the change was applied.
I'm afraid there is not much to do about this. If we remove sorting logic, then we can get a much bigger mess where the situation You described happens more often. Increasing the buffer size and sorting minimizes this, on average. We can also drop last n records when applying changes and save them for the next batch, but we don't know if the missing record is last or first.
A possible "solution" (workaround, but still not perfect) for that is to implement in the Conduit something similar to TCP/IP packets, where they are numbered and when client gets response no.1 and no.3, still waits for no.2 (e.g. some BatchID attached to Record). But even then:
- There is no guarantee that no.2 will ever come, and a pipeline will be stuck. Timeouts, if added, will become a real pain when this happens frequently.
- There is no guarantee that source connector received data in order. It would have to be implemented by actual source.
The order does not always matter. When Record.Key is not set, then records are always created. This also solves it for certain use cases. For that, I think it would be useful to have a dedicated config "Always create" to disable upsert logic event when Record.Key is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that you're saying. Let's add a comment to that line above, briefly explaining why we do that. If we ever start seeing the mentioned edge cases or performance degradation, we can work on it then, it's probably too early to work on it now anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why are we even concerned about records arriving out of order? Conduit guarantees message order, so unless the source produces the records out of order I don't see why we would need to sort them. Even if that is the case, it sounds like a bug that needs to be fixed by the source and not something the destination should address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@donatorsky IMHO the point @lovromazgon made is a good one. Given that the sort is running always (so there's always some performance penalty), but that it's not a 100% effective solution (due to batches) and that another condition is that sources messed something up (because as Lovro said, Conduit itself guarantees message order), it looks like we shouldn't be doing it. What are your thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your example you say that those two events were sent at those times. With that, you mean issued by Salesforce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, issued by the Salesforce, and they have these dates in the payload:
1st: Sent at 15:30:01 with payload {"created_at":"yyyy-mm-dd 15:30:01"}
, received 15:30:12.
2nd: Sent at 15:30:02 with payload {"created_at":"yyyy-mm-dd 15:30:02"}
, received 15:30:11.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC you are saying that the Salesforce source connector can create records in an order that is different from the order of events as they hapoened in Salesforce. This sounds like a bug in the source connector to me, definitely not something the destination should care about, otherwise we would need to include this logic in each and every destination connector, just in case.
You are saying that Conduit manages the order of messages received (i.e. HTTP requests), but not the actual order of messages specified in the payload.
What I am trying to say is that the source connector is completely in charge of creating the records in whichever order it chooses. Conduit will make sure that the order remains the same all the way to the destination, so if the source connector produces record A and then record B, the destination will receive record A and then record B.
To sum up, the source connector has more information about the records it creates than any other component in the pipeline, so it is the best place to decide the correct order of records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 If, for any reason, sorting is needed, then the source connector sounds like a better place for that. Plus, sorting will be done even when it's not needed, which affects performance (even if small) for no reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated: d7b7145
@hariso Some comments appeared twice, I removed duplicates 🙂 |
😕 |
f50e26f
to
737256c
Compare
destination/destination.go
Outdated
) | ||
} else { | ||
d.operationsQueue[n].err = fmt.Errorf( | ||
"item with key=%s create/upsert/delete failure: [%s] %s: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to know which operation exactly was it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can get it from the switch statement a few lines earlier. Will add this 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet 👍
// Execute operations | ||
retriesLeft := d.config.Retries | ||
|
||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This for loop is for retries, right?
Not a blocker for this PR, but it would nice to split this whole method into smaller ones, and make the retry logic stand out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #7 for that
destination/destination.go
Outdated
actionCreate = "create" | ||
actionCreated = "created" | ||
actionUpdate = "update" | ||
actionUpdated = "updated" | ||
actionDelete = "delete" | ||
actionDeleted = "deleted" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need both flavors (created and created, etc.)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems not to be standardized, so I wanted to support as many reasonable possibilities as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean, not standardized in the ES API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, these are values from the Record's metadata["action"] field. So it actually depends on creators of source connectors 😉 I've seen you use action=delete for S3 and insert, update, delete for Postgres while Salesforce uses created, updated etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. We're working on getting standardized through OpenCDC. So it would be good to mention the reason for the duplication in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@donatorsky Is there a special reason why the Salesforce connector uses the "d" version? The S3 and Pg connectors are in line with the OpenCDC guidelines here: https://github.com/ConduitIO/conduit/blob/main/docs/design-documents/20220309-opencdc.md?plain=1#L267-L272
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because I get "d" version directly from the Salesforce in query response.
From this PR point of view, it seems I should drop "d" versions, go with OpenCDC guidelines and update Salesforce connector to translate SF ⇒ OpenCDC actions. Should I proceed with these steps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@donatorsky That's right, I'd go with that. Internally, connectors are free to do what they want, but the input and output should match the Conduit APIs and the OpenCDC guidelines (which are work in progress obviously and will be refined over time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated: 9fe58b3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And Salesforce counterpart:
conduitio-labs/conduit-connector-salesforce@f1d0dec
internal/elasticsearch/client.go
Outdated
// PrepareCreateOperation prepares insert operation definition | ||
PrepareCreateOperation(item sdk.Record) (metadata interface{}, payload interface{}, err error) | ||
|
||
// PrepareUpsertOperation prepares upsert operation definition | ||
PrepareUpsertOperation(key string, item sdk.Record) (metadata interface{}, payload interface{}, err error) | ||
|
||
// PrepareDeleteOperation prepares delete operation definition | ||
PrepareDeleteOperation(key string) (metadata interface{}, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are bulk operations right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These create a single change definition for bulk query. The bulk query itself is composed of these later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, I guessed so. It would be nice to have it in the name, or the comment at the very least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Underlying implementation differs between ES versions, same reason as for here:
#1 (comment)
It is definitely missing comment that it is for bulk, despite the fact that only bulk operation is supported by the connector. I'm adding this 🙂
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package v7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a difference between the requests (and responses) across different versions, especially v5-v7?
If there are no changes, it looks like we can take advantage of that, and simplify the client code. v8 is the one being actively maintained, so if v5-v7 are the same, they will stay the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are different, unfortunately :/ v6 introduces RetryOnConflict int
for update action and v7 removes support for indices' types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too bad!
spec.go
Outdated
destination.ConfigKeyVersion: { | ||
Default: "", | ||
Required: true, | ||
Description: "The version of the Elasticsearch service.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to mention what are the possible values (and in which format, e.g. is it 8 or v8). It is mentioned in the ReadMe (which is great!) but these specs will be used to automatically populate options in a UI widget.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, added 🙂
version: '3.9' | ||
|
||
services: | ||
kibana: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this Kibana instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is strictly for local development. I was going to exclude docker-compose*.overrides.yml
but decided to leave it, maybe it will help someone in the future.
When running tests, Kibana is not started. Only locally, when explicitly defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a nice thing, so maybe you can mention it in the ReadMe (i.e. how to start Kibana for local dev).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we're good to go with this PR, thanks all of the good work! I can't find all the comments, but three things are outstanding:
- Comment about actions (created, create etc.)
- Sorting records
- Splitting that method (with the retry logic)
Description
Adds destination handling to Elasticsearch connector.