-
Notifications
You must be signed in to change notification settings - Fork 914
KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #1873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
bb27e73
to
91c560a
Compare
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
98bc0bc
to
a1e887a
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on Code part.
'group.protocol': Protocol, | ||
'enable.auto.commit': False, | ||
'auto.offset.reset': 'earliest', | ||
'debug': 'all'} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debug logging.
topic_prefix = "test-topic" | ||
|
||
|
||
def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): | |
def create_consumers(kafka_cluster, topic, group_id, client_id, group_protocol): |
examples/adminapi.py
Outdated
if member.target_assignment: | ||
print(" Target Assignments:") | ||
for toppar in member.target_assignment.topic_partitions: | ||
print(" {} [{}]".format(toppar.topic, toppar.partition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if member.target_assignment: | |
print(" Target Assignments:") | |
for toppar in member.target_assignment.topic_partitions: | |
print(" {} [{}]".format(toppar.topic, toppar.partition)) | |
if member.target_assignment: | |
print(" Target Assignments:") | |
for toppar in member.target_assignment.topic_partitions: | |
print(f" {toppar.topic} [{toppar.partition)}]" |
src/confluent_kafka/admin/_group.py
Outdated
group_instance_id : str | ||
The instance id of the group member. | ||
""" | ||
|
||
def __init__(self, member_id, client_id, host, assignment, group_instance_id=None): | ||
def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't change the API contract.
def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None): | |
def __init__(self, member_id, client_id, host, assignment, group_instance_id=None, target_assignment=None): |
src/confluent_kafka/admin/_group.py
Outdated
coordinator: Node | ||
Consumer group coordinator. | ||
authorized_operations: list(AclOperation) | ||
AclOperations allowed for the consumer group. | ||
""" | ||
|
||
def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state, | ||
coordinator, authorized_operations=None): | ||
type, coordinator, authorized_operations=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't change API contract.
type, coordinator, authorized_operations=None): | |
coordinator, authorized_operations=None, type=ConsumerGroupType.UNKNOWN): |
src/confluent_kafka/src/Admin.c
Outdated
@@ -3892,6 +3893,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio | |||
|
|||
PyDict_SetItemString(kwargs, "assignment", assignment); | |||
|
|||
c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member); | |||
if(c_target_assignment) { | |||
PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- declare at top with
assignment
- Decrease refcount.
c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member); | ||
if(c_target_assignment) { | ||
PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment); | ||
if (!target_assignment) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check NULL case is working or not. target_assignment should be None in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is working
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
0fd0377
to
8437ffd
Compare
This comment has been minimized.
This comment has been minimized.
8437ffd
to
3cb4080
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
consumer.close() | ||
|
||
|
||
def test_describe_consumer_groups_compatability(kafka_cluster): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def test_describe_consumer_groups_compatability(kafka_cluster): | |
def test_describe_consumer_groups_compatibility(kafka_cluster): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change the test test_describe_operations.py
instead of adding this one. There's a test that is disabled with KIP-848 there.
Here it seems the verify_describe_consumer_groups
test isn't run as it doesn't start with test_
and isn't called by test_describe_consumer_groups_compatibility
.
Better to move this tests to a different PR and merge them for GA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it. Will raise a seperate PR for that
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
d5b6ed0
to
dd43a7d
Compare
This comment has been minimized.
This comment has been minimized.
|
||
- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856) | ||
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confluent-kafka-python v2.10.0 is based on librdkafka v2.10.0, see the | |
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0) | |
for a complete list of changes, enhancements, fixes and upgrade considerations. |
CHANGELOG.md
Outdated
v2.10.0 is a feature release with the following fixes and enhancements: | ||
|
||
- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856) | ||
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922). | |
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#1873). |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok for me, reviewing the tests separately. Did some manual tests for the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!.
This PR intends to add support for DescribeConsumerGroup for the new consumer protocol.
The tests will fail till the changes in the librdkafka is merged to master.
Librdkafka PR: [https://github.com/https://github.com/https://github.com/confluentinc/librdkafka/pull/4941](KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups)