-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: remove group.id
from kafka props and reuse generated group.id
#13856
Conversation
- Remove unused `consumer_group` field from `KafkaProperties` struct - Modify `group.id` format in kafka consumer to include source and actor IDs - Remove check for `group.id` config value - Add `bytes.per.second` and `max.num.messages` configuration parameters - Update `new` function to accept optional list of columns as parameter - Update `into_stream` function to clone parser config and source context Signed-off-by: tabVersion <[email protected]>
Also, the pr also changes the |
"group.id", | ||
format!( | ||
"rw-consumer-{}-{}", | ||
source_ctx.source_info.fragment_id, source_ctx.source_info.actor_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 IIUC, the consumers of multiple parallelism should have same group.id
i.e. in the same consumer group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They will share the same source id rather than actor id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second thought, actor id is global unique. We can even directly use actor id here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use the specific actor id rather than timestamp id, does it means the consume group previrous created will be resue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use the specific actor id rather than timestamp id, does it means the consume group previrous created will be resue?
Yes, the requirement is that a consumer group cannot be shared within parallelism. But the actor ids within the parallelism are unique.
Signed-off-by: tabVersion <[email protected]>
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #13856 +/- ##
==========================================
+ Coverage 68.08% 68.13% +0.05%
==========================================
Files 1533 1533
Lines 264675 264666 -9
==========================================
+ Hits 180194 180340 +146
+ Misses 84481 84326 -155
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
consumer_group
field fromKafkaProperties
structgroup.id
format in kafka consumer to include source and actor IDsgroup.id
config valuebytes.per.second
andmax.num.messages
configuration parametersnew
function to accept optional list of columns as parameterinto_stream
function to clone parser config and source contextresolve #13855
may add
group.id
when scaling and may change most of id whenalter
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
remove
properties.group.id
from kafka source props