Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove group.id from kafka props and reuse generated group.id #13856

Merged
merged 7 commits into from
Dec 12, 2023

Conversation

tabVersion
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Remove unused consumer_group field from KafkaProperties struct
  • Modify group.id format in kafka consumer to include source and actor IDs
  • Remove check for group.id config value
  • Add bytes.per.second and max.num.messages configuration parameters
  • Update new function to accept optional list of columns as parameter
  • Update into_stream function to clone parser config and source context

resolve #13855

may add group.id when scaling and may change most of id when alter

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

remove properties.group.id from kafka source props

- Remove unused `consumer_group` field from `KafkaProperties` struct
- Modify `group.id` format in kafka consumer to include source and actor IDs
- Remove check for `group.id` config value
- Add `bytes.per.second` and `max.num.messages` configuration parameters
- Update `new` function to accept optional list of columns as parameter
- Update `into_stream` function to clone parser config and source context

Signed-off-by: tabVersion <[email protected]>
@tabVersion
Copy link
Contributor Author

Also, the pr also changes the group.id's prefix, please update the whitelist conf accordingly.

"group.id",
format!(
"rw-consumer-{}-{}",
source_ctx.source_info.fragment_id, source_ctx.source_info.actor_id
Copy link
Member

@fuyufjh fuyufjh Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 IIUC, the consumers of multiple parallelism should have same group.id i.e. in the same consumer group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will share the same source id rather than actor id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, actor id is global unique. We can even directly use actor id here.

Copy link
Member

@yufansong yufansong Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the specific actor id rather than timestamp id, does it means the consume group previrous created will be resue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the specific actor id rather than timestamp id, does it means the consume group previrous created will be resue?

Yes, the requirement is that a consumer group cannot be shared within parallelism. But the actor ids within the parallelism are unique.

Signed-off-by: tabVersion <[email protected]>
Copy link

codecov bot commented Dec 7, 2023

Codecov Report

Attention: 7 lines in your changes are missing coverage. Please review.

Comparison is base (1d4cac8) 68.08% compared to head (42f8018) 68.13%.
Report is 4 commits behind head on main.

Files Patch % Lines
src/connector/src/source/kafka/source/reader.rs 0.00% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #13856      +/-   ##
==========================================
+ Coverage   68.08%   68.13%   +0.05%     
==========================================
  Files        1533     1533              
  Lines      264675   264666       -9     
==========================================
+ Hits       180194   180340     +146     
+ Misses      84481    84326     -155     
Flag Coverage Δ
rust 68.13% <0.00%> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@yufansong yufansong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tabVersion tabVersion enabled auto-merge December 12, 2023 03:52
@tabVersion tabVersion added this pull request to the merge queue Dec 12, 2023
Merged via the queue into main with commit fd3b763 Dec 12, 2023
6 of 7 checks passed
@tabVersion tabVersion deleted the tab/change-group-id branch December 12, 2023 04:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feature request: reuse group.id for kafka source
3 participants