Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Connection for Kafka source & sink #19270
feat: Connection for Kafka source & sink #19270
Changes from 64 commits
b9b2f79
ab46672
3373a58
c5203d6
3b6a6a2
3b3f725
968ed08
a41f3fc
6534ebf
7249e78
0024e1d
ad8b989
ae1b70a
58b5128
f115a0c
d128644
ae9df41
45295bc
a9e34c7
d27ab90
256485c
62cb953
725e23c
832f66f
73f0b7b
ac1d63d
35fb002
ec49096
16d8c42
b3efda6
6a729f5
fbc9917
a79d5da
a561ea3
2d7ec88
3a18c4c
60c09fd
e9f8d72
23b2011
673bccb
635975d
0fd3972
0e01a05
01363ec
94f730e
5baccf3
f587e8b
1da1d1a
403868e
aaa6a34
1d2cb3d
45a9b8d
cc75ea7
a446707
e871ab7
eb371d5
99b2094
795a79d
c26c05b
64a4e26
28bb651
9c0e9df
e312ef2
d24fafb
dc3fd25
401660b
08fb575
48a28a2
d5be997
2f0a4a8
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we just reuse
Source.connection_id
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and same for the schema registry part and sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be added to
SinkParam
&SinkDesc
instead, side-by-side with the Sink'sproperties
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me clear the logic for the related proto defs
Sink (proto/catalog.proto)
: def of sink in catalog for all sink, we will read props and resolve connection dep based on this message. And the message containsSinkFormatDesc
. We already haveconnection.id
field here.Sink.properties
contains the ones from connection catalog.SinkFormatDesc (proto/catalog.proto)
: specifyingformat ... encode ...
options (and contained by sink catalog). We ref a connection here for schema registry.SinkDesc (proto/stream_plan.proto)
: It defines what field we dispatch to CNs. But we have already resolved the props from connection def in meta so there is no need to change things here.SinkParam (proto/connector_service.proto)
: seems to define the transport protocol between kernel and connector node (most ref in Java). And ditto, the step is behind the meta node so no change is needed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So here we expand all the connections into concrete parameters on creation, right? This is okay to me, but in this way, I think
connection_id
here inSinkFormatDesc
should not appear as well, because allconnection
-related things are already eliminated on creation.(It seems
SinkFormatDesc::connection_id
was indeed not used anywhere?)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the source side, the connector part and schema registry part will have independent connection, just like individual secret_refs.
we are offering syntax like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I know. What I mean is: should it be resolved and eliminated before converting into a proto message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We resolve the props in the frontend and the catalog is persisted in the meta. We have to reserve a field to keep the connection_id to maintain the dependency. And to keep the design simple and align with the secret ref, I think adding
connection.id
for each connector and schema registry is the best practice.& We are going to support
alter connection
and apply the changes to all related source/sink. Eliminating theconnection.id
in the catalog makes the step harder.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You keep both
connection_id
and the resolved connection arguments in these message structures, right? It's acceptable to me but a bit counter-intuitive.IIRC, secret ref doesn't keep the resolved plaintext secret at all. It always resolves whenever using a secret.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, here is a some gap on
resolve
.You can ref to
resolve_connection_ref_and_secret_ref
, resolve here means extractingxxx = secret <secret name>
from "with props" toPbSecretRef (secret_id)
. We will do the same op for secrets coming from both connection catalog and with props.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of a weak-typed implementation, as the properties are kept as a
map<string,string>
here.On the contrary, we may optionally make it strong-typed by defining connection's fields as a
message
(such asmessage KafkaConnection
), and meanwhile, the corresponding struct (such asstruct KafkaConnectionInner
) would be substituted.I tend to prefer the latter one, but I am not sure how to handle secrets. Comments are welcomed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had this idea before.
If we are going with a concrete type here, we have to accept a more complex impl when handling
create source/sink/table
, ie. we have to know an attr comes from either with-clause or connection catalog and make sure it won't collide. Besides, secret ref is a problem.Current impl overcomes the prior problem by merging everything into hashmap and keeps the original path to
create source/sink/table
unchanged.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to keep the
oneof
now. May remove it to improve readability.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are still using the proto in meta backup. Removing the
oneof
might break compatibility.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, then let's place
connection_params
outside theoneof
at least