Skip to content

KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #1873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 17, 2025

Conversation

PratRanj07
Copy link
Contributor

@PratRanj07 PratRanj07 commented Dec 17, 2024

This PR intends to add support for DescribeConsumerGroup for the new consumer protocol.
The tests will fail till the changes in the librdkafka is merged to master.
Librdkafka PR: [https://github.com/https://github.com/https://github.com/confluentinc/librdkafka/pull/4941](KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups)

@PratRanj07 PratRanj07 requested review from a team as code owners December 17, 2024 20:31
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_add_support_describegroup branch 2 times, most recently from bb27e73 to 91c560a Compare April 16, 2025 18:15
@PratRanj07 PratRanj07 changed the base branch from master to dev_kip848-preview-push April 16, 2025 18:16
@sonarqube-confluent

This comment has been minimized.

1 similar comment
@sonarqube-confluent

This comment has been minimized.

@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_add_support_describegroup branch from 98bc0bc to a1e887a Compare April 17, 2025 05:37
@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on Code part.

'group.protocol': Protocol,
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
'debug': 'all'}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove debug logging.

topic_prefix = "test-topic"


def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol):
def create_consumers(kafka_cluster, topic, group_id, client_id, group_protocol):

Comment on lines 552 to 555
if member.target_assignment:
print(" Target Assignments:")
for toppar in member.target_assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if member.target_assignment:
print(" Target Assignments:")
for toppar in member.target_assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
if member.target_assignment:
print(" Target Assignments:")
for toppar in member.target_assignment.topic_partitions:
print(f" {toppar.topic} [{toppar.partition)}]"

group_instance_id : str
The instance id of the group member.
"""

def __init__(self, member_id, client_id, host, assignment, group_instance_id=None):
def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't change the API contract.

Suggested change
def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None):
def __init__(self, member_id, client_id, host, assignment, group_instance_id=None, target_assignment=None):

coordinator: Node
Consumer group coordinator.
authorized_operations: list(AclOperation)
AclOperations allowed for the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,
coordinator, authorized_operations=None):
type, coordinator, authorized_operations=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't change API contract.

Suggested change
type, coordinator, authorized_operations=None):
coordinator, authorized_operations=None, type=ConsumerGroupType.UNKNOWN):

@@ -3892,6 +3893,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio

PyDict_SetItemString(kwargs, "assignment", assignment);

c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member);
if(c_target_assignment) {
PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • declare at top with assignment
  • Decrease refcount.

c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member);
if(c_target_assignment) {
PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment);
if (!target_assignment) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check NULL case is working or not. target_assignment should be None in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is working

@sonarqube-confluent

This comment has been minimized.

1 similar comment
@sonarqube-confluent

This comment has been minimized.

@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_add_support_describegroup branch from 0fd0377 to 8437ffd Compare April 17, 2025 08:56
@sonarqube-confluent

This comment has been minimized.

Base automatically changed from dev_kip848-preview-push to master April 17, 2025 10:03
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_add_support_describegroup branch from 8437ffd to 3cb4080 Compare April 17, 2025 10:14
@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

consumer.close()


def test_describe_consumer_groups_compatability(kafka_cluster):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_describe_consumer_groups_compatability(kafka_cluster):
def test_describe_consumer_groups_compatibility(kafka_cluster):

Copy link
Contributor

@emasab emasab Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change the test test_describe_operations.py instead of adding this one. There's a test that is disabled with KIP-848 there.
Here it seems the verify_describe_consumer_groups test isn't run as it doesn't start with test_ and isn't called by test_describe_consumer_groups_compatibility.

Better to move this tests to a different PR and merge them for GA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it. Will raise a seperate PR for that

@sonarqube-confluent

This comment has been minimized.

1 similar comment
@sonarqube-confluent

This comment has been minimized.

@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_add_support_describegroup branch from d5b6ed0 to dd43a7d Compare April 17, 2025 12:20
@sonarqube-confluent

This comment has been minimized.


- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856)
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
confluent-kafka-python v2.10.0 is based on librdkafka v2.10.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

CHANGELOG.md Outdated
v2.10.0 is a feature release with the following fixes and enhancements:

- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856)
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922).
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#1873).

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent
Copy link

Passed

Analysis Details

0 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 0 Code Smells

Coverage and Duplications

  • Coverage 12.50% Coverage (62.00% Estimated after merge)
  • Duplications No duplication information (1.00% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok for me, reviewing the tests separately. Did some manual tests for the moment.

Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!.

@PratRanj07 PratRanj07 merged commit 5a1f136 into master Apr 17, 2025
3 checks passed
@PratRanj07 PratRanj07 deleted the dev_add_support_describegroup branch April 17, 2025 14:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants