From 7fe5b9046edf8a0818c89733e681b4103e28a8d3 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 18 Dec 2024 01:41:56 +0530 Subject: [PATCH 01/12] Added support for describe consumer group for new protocol --- examples/adminapi.py | 5 + src/confluent_kafka/admin/_group.py | 11 +- src/confluent_kafka/src/Admin.c | 12 ++ ..._describe_consumer_groups_compatability.py | 134 ++++++++++++++++++ 4 files changed, 160 insertions(+), 2 deletions(-) create mode 100644 tests/integration/admin/test_describe_consumer_groups_compatability.py diff --git a/examples/adminapi.py b/examples/adminapi.py index 54f119e02..29030b0de 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -535,6 +535,7 @@ def example_describe_consumer_groups(a, args): print("Group Id: {}".format(g.group_id)) print(" Is Simple : {}".format(g.is_simple_consumer_group)) print(" State : {}".format(g.state)) + print(" Type : {}".format(g.type)) print(" Partition Assignor : {}".format(g.partition_assignor)) print( f" Coordinator : {g.coordinator}") @@ -548,6 +549,10 @@ def example_describe_consumer_groups(a, args): print(" Assignments :") for toppar in member.assignment.topic_partitions: print(" {} [{}]".format(toppar.topic, toppar.partition)) + if member.target_assignment: + print(" Target Assignments:") + for toppar in member.target_assignment.topic_partitions: + print(" {} [{}]".format(toppar.topic, toppar.partition)) if (include_auth_ops): print(" Authorized operations: ") op_string = "" diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 964d62b2f..1f7882cc4 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -94,15 +94,18 @@ class MemberDescription: The host where the group member is running. assignment: MemberAssignment The assignment of the group member + target_assignment: MemberAssignment + The target assignment of the group member group_instance_id : str The instance id of the group member. """ - def __init__(self, member_id, client_id, host, assignment, group_instance_id=None): + def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None): self.member_id = member_id self.client_id = client_id self.host = host self.assignment = assignment + self.target_assignment = target_assignment self.group_instance_id = group_instance_id @@ -123,6 +126,8 @@ class ConsumerGroupDescription: Partition assignor. state : ConsumerGroupState Current state of the consumer group. + type : ConsumerGroupType + Type of the consumer group. coordinator: Node Consumer group coordinator. authorized_operations: list(AclOperation) @@ -130,7 +135,7 @@ class ConsumerGroupDescription: """ def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state, - coordinator, authorized_operations=None): + type, coordinator, authorized_operations=None): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group self.members = members @@ -143,4 +148,6 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign self.partition_assignor = partition_assignor if state is not None: self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState) + if type is not None: + self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType) self.coordinator = coordinator diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 5eeb5c4cd..f9c3c0866 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -3858,6 +3858,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio PyObject *kwargs = NULL; PyObject *assignment = NULL; const rd_kafka_MemberAssignment_t *c_assignment; + const rd_kafka_MemberAssignment_t *c_target_assignment; MemberDescription_type = cfl_PyObject_lookup("confluent_kafka.admin", "MemberDescription"); @@ -3892,6 +3893,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio PyDict_SetItemString(kwargs, "assignment", assignment); + c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member); + if(c_target_assignment) { + PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment); + if (!target_assignment) { + goto err; + } + PyDict_SetItemString(kwargs, "target_assignment", target_assignment); + } + args = PyTuple_New(0); member = PyObject_Call(MemberDescription_type, args, kwargs); @@ -4003,6 +4013,8 @@ static PyObject *Admin_c_ConsumerGroupDescription_to_py( cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupDescription_state(c_consumer_group_description)); + cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupDescription_type(c_consumer_group_description)); + args = PyTuple_New(0); consumer_group_description = PyObject_Call(ConsumerGroupDescription_type, args, kwargs); diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py new file mode 100644 index 000000000..af6dd9a37 --- /dev/null +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import pytest + +from confluent_kafka import Consumer, ConsumerGroupState, ConsumerGroupType, TopicPartition +from confluent_kafka.admin import AdminClient +import uuid + +from tests.common import TestUtils + +topic_prefix = "test-topic" +# Generate random group IDs + +def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): + conf = {'group.id': group_id, + 'client.id': client_id, + 'group.protocol': Protocol, + 'session.timeout.ms': 6000, + 'enable.auto.commit': False, + 'auto.offset.reset': 'earliest', + 'debug': 'all'} + consumer = kafka_cluster.consumer(conf) + consumer.subscribe([topic]) + consumer.poll(10) + return consumer + +def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): + + group_id_new1 = f"test-group_new1-{uuid.uuid4()}" + group_id_new2 = f"test-group_new2-{uuid.uuid4()}" + group_id_old1 = f"test-group_old1-{uuid.uuid4()}" + group_id_old2 = f"test-group_old2-{uuid.uuid4()}" + + client_id1 = "test-client1" + client_id2 = "test-client2" + client_id3 = "test-client3" + client_id4 = "test-client4" + + consumers = [] + + # Create two groups with new group protocol + consumers.append(create_consumers(kafka_cluster, topic, group_id_new1, client_id1, "consumer")) + consumers.append(create_consumers(kafka_cluster, topic, group_id_new2, client_id2, "consumer")) + + # Create two groups with old group protocol + consumers.append(create_consumers(kafka_cluster, topic, group_id_old1, client_id3, "classic")) + consumers.append(create_consumers(kafka_cluster, topic, group_id_old2, client_id4, "classic")) + + partition = [TopicPartition(topic, 0)] + + # We will pass 3 requests, one containing the two groups created with new + # group protocol and the other containing the two groups created with old + # group protocol and the third containing all the groups and verify the results. + fs1 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2]) + for group_id, f in fs1.items(): + result = f.result() + assert result.group_id in [group_id_new1, group_id_new2] + assert result.is_simple_consumer_group is False + assert result.state == ConsumerGroupState.STABLE + assert result.type == ConsumerGroupType.CONSUMER + assert len(result.members) == 1 + for member in result.members: + assert member.client_id in [client_id1, client_id2] + assert member.assignment.topic_partitions == partition + + fs2 = admin_client.describe_consumer_groups(group_ids=[group_id_old1, group_id_old2]) + for group_id, f in fs2.items(): + result = f.result() + assert result.group_id in [group_id_old1, group_id_old2] + assert result.is_simple_consumer_group is False + assert result.state == ConsumerGroupState.STABLE + assert result.type == ConsumerGroupType.CLASSIC + assert len(result.members) == 1 + for member in result.members: + assert member.client_id in [client_id3, client_id4] + assert member.assignment.topic_partitions == partition + + fs3 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2, group_id_old1, group_id_old2]) + for group_id, f in fs3.items(): + result = f.result() + assert result.group_id in [group_id_new1, group_id_new2, group_id_old1, group_id_old2] + assert result.is_simple_consumer_group is False + assert result.state == ConsumerGroupState.STABLE + if result.group_id in [group_id_new1, group_id_new2]: + assert result.type == ConsumerGroupType.CONSUMER + else: + assert result.type == ConsumerGroupType.CLASSIC + assert len(result.members) == 1 + for member in result.members: + if result.group_id in [group_id_new1, group_id_new2]: + assert member.client_id in [client_id1, client_id2] + else: + assert member.client_id in [client_id3, client_id4] + assert member.assignment.topic_partitions == partition + + for consumer in consumers: + consumer.close() + +def test_describe_consumer_groups_compatability(kafka_cluster): + + admin_client = kafka_cluster.admin() + + # Create Topic + topic_config = {"compression.type": "gzip"} + our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + { + "num_partitions": 1, + "config": topic_config, + "replication_factor": 1, + }, + validate_only=False + ) + + if TestUtils.use_group_protocol_consumer(): + verify_describe_consumer_groups(kafka_cluster, admin_client, our_topic) + + # Delete created topic + fs = admin_client.delete_topics([our_topic]) + for topic, f in fs.items(): + f.result() \ No newline at end of file From d45971f6a8c6c0dc381d1c1bf52f6b5fa8415ce6 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 18 Dec 2024 01:58:42 +0530 Subject: [PATCH 02/12] style fix --- ..._describe_consumer_groups_compatability.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py index af6dd9a37..65d83e7ca 100644 --- a/tests/integration/admin/test_describe_consumer_groups_compatability.py +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -13,17 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time -import pytest - -from confluent_kafka import Consumer, ConsumerGroupState, ConsumerGroupType, TopicPartition -from confluent_kafka.admin import AdminClient +from confluent_kafka import ConsumerGroupState, ConsumerGroupType, TopicPartition import uuid from tests.common import TestUtils topic_prefix = "test-topic" -# Generate random group IDs + def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): conf = {'group.id': group_id, @@ -38,6 +34,7 @@ def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): consumer.poll(10) return consumer + def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): group_id_new1 = f"test-group_new1-{uuid.uuid4()}" @@ -51,7 +48,7 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): client_id4 = "test-client4" consumers = [] - + # Create two groups with new group protocol consumers.append(create_consumers(kafka_cluster, topic, group_id_new1, client_id1, "consumer")) consumers.append(create_consumers(kafka_cluster, topic, group_id_new2, client_id2, "consumer")) @@ -76,7 +73,7 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): for member in result.members: assert member.client_id in [client_id1, client_id2] assert member.assignment.topic_partitions == partition - + fs2 = admin_client.describe_consumer_groups(group_ids=[group_id_old1, group_id_old2]) for group_id, f in fs2.items(): result = f.result() @@ -88,7 +85,7 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): for member in result.members: assert member.client_id in [client_id3, client_id4] assert member.assignment.topic_partitions == partition - + fs3 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2, group_id_old1, group_id_old2]) for group_id, f in fs3.items(): result = f.result() @@ -106,10 +103,11 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): else: assert member.client_id in [client_id3, client_id4] assert member.assignment.topic_partitions == partition - + for consumer in consumers: consumer.close() + def test_describe_consumer_groups_compatability(kafka_cluster): admin_client = kafka_cluster.admin() @@ -124,11 +122,11 @@ def test_describe_consumer_groups_compatability(kafka_cluster): }, validate_only=False ) - + if TestUtils.use_group_protocol_consumer(): verify_describe_consumer_groups(kafka_cluster, admin_client, our_topic) # Delete created topic fs = admin_client.delete_topics([our_topic]) for topic, f in fs.items(): - f.result() \ No newline at end of file + f.result() From 137dd1dbdefe0f2349fbdd85f55454dcbc8521fb Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 16 Apr 2025 21:16:32 +0530 Subject: [PATCH 03/12] removed session timeout --- .../admin/test_describe_consumer_groups_compatability.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py index 65d83e7ca..80f775a49 100644 --- a/tests/integration/admin/test_describe_consumer_groups_compatability.py +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -25,7 +25,6 @@ def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): conf = {'group.id': group_id, 'client.id': client_id, 'group.protocol': Protocol, - 'session.timeout.ms': 6000, 'enable.auto.commit': False, 'auto.offset.reset': 'earliest', 'debug': 'all'} @@ -123,9 +122,6 @@ def test_describe_consumer_groups_compatability(kafka_cluster): validate_only=False ) - if TestUtils.use_group_protocol_consumer(): - verify_describe_consumer_groups(kafka_cluster, admin_client, our_topic) - # Delete created topic fs = admin_client.delete_topics([our_topic]) for topic, f in fs.items(): From 0e696d3e8793c3ade66bcaccf9e65c98a9517574 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 16 Apr 2025 23:59:11 +0530 Subject: [PATCH 04/12] style fix --- ...est_describe_consumer_groups_compatability.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py index 80f775a49..dcac8e3c3 100644 --- a/tests/integration/admin/test_describe_consumer_groups_compatability.py +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -16,8 +16,6 @@ from confluent_kafka import ConsumerGroupState, ConsumerGroupType, TopicPartition import uuid -from tests.common import TestUtils - topic_prefix = "test-topic" @@ -114,13 +112,13 @@ def test_describe_consumer_groups_compatability(kafka_cluster): # Create Topic topic_config = {"compression.type": "gzip"} our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, - { - "num_partitions": 1, - "config": topic_config, - "replication_factor": 1, - }, - validate_only=False - ) + { + "num_partitions": 1, + "config": topic_config, + "replication_factor": 1, + }, + validate_only=False + ) # Delete created topic fs = admin_client.delete_topics([our_topic]) From e52fcc506ef84f0f128978fa3c7bc53ef70826e2 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 17 Apr 2025 11:32:44 +0530 Subject: [PATCH 05/12] Simplifying test code --- ..._describe_consumer_groups_compatability.py | 112 +++++++++--------- 1 file changed, 54 insertions(+), 58 deletions(-) diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py index dcac8e3c3..93bacd7bf 100644 --- a/tests/integration/admin/test_describe_consumer_groups_compatability.py +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -33,74 +33,70 @@ def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): + def create_consumer_groups(): + """Create consumer groups with new and old protocols.""" + group_ids = { + "new1": f"test-group_new1-{uuid.uuid4()}", + "new2": f"test-group_new2-{uuid.uuid4()}", + "old1": f"test-group_old1-{uuid.uuid4()}", + "old2": f"test-group_old2-{uuid.uuid4()}", + } + client_ids = { + "new": ["test-client1", "test-client2"], + "old": ["test-client3", "test-client4"], + } + + consumers = [ + create_consumers(kafka_cluster, topic, group_ids["new1"], client_ids["new"][0], "consumer"), + create_consumers(kafka_cluster, topic, group_ids["new2"], client_ids["new"][1], "consumer"), + create_consumers(kafka_cluster, topic, group_ids["old1"], client_ids["old"][0], "classic"), + create_consumers(kafka_cluster, topic, group_ids["old2"], client_ids["old"][1], "classic"), + ] + return group_ids, client_ids, consumers + + def verify_consumer_group_results(fs, expected_group_ids, expected_type, expected_clients): + """Verify the results of consumer group descriptions.""" + for group_id, f in fs.items(): + result = f.result() + assert result.group_id in expected_group_ids + assert result.is_simple_consumer_group is False + assert result.state == ConsumerGroupState.STABLE + assert result.type == expected_type + assert len(result.members) == 1 + for member in result.members: + assert member.client_id in expected_clients + assert member.assignment.topic_partitions == partition + + # Create consumer groups + group_ids, client_ids, consumers = create_consumer_groups() + partition = [TopicPartition(topic, 0)] - group_id_new1 = f"test-group_new1-{uuid.uuid4()}" - group_id_new2 = f"test-group_new2-{uuid.uuid4()}" - group_id_old1 = f"test-group_old1-{uuid.uuid4()}" - group_id_old2 = f"test-group_old2-{uuid.uuid4()}" - - client_id1 = "test-client1" - client_id2 = "test-client2" - client_id3 = "test-client3" - client_id4 = "test-client4" - - consumers = [] - - # Create two groups with new group protocol - consumers.append(create_consumers(kafka_cluster, topic, group_id_new1, client_id1, "consumer")) - consumers.append(create_consumers(kafka_cluster, topic, group_id_new2, client_id2, "consumer")) - - # Create two groups with old group protocol - consumers.append(create_consumers(kafka_cluster, topic, group_id_old1, client_id3, "classic")) - consumers.append(create_consumers(kafka_cluster, topic, group_id_old2, client_id4, "classic")) + # Describe and verify new group protocol consumer groups + fs_new = admin_client.describe_consumer_groups([group_ids["new1"], group_ids["new2"]]) + verify_consumer_group_results(fs_new, [group_ids["new1"], group_ids["new2"]], + ConsumerGroupType.CONSUMER, client_ids["new"]) - partition = [TopicPartition(topic, 0)] + # Describe and verify old group protocol consumer groups + fs_old = admin_client.describe_consumer_groups([group_ids["old1"], group_ids["old2"]]) + verify_consumer_group_results(fs_old, [group_ids["old1"], group_ids["old2"]], + ConsumerGroupType.CLASSIC, client_ids["old"]) - # We will pass 3 requests, one containing the two groups created with new - # group protocol and the other containing the two groups created with old - # group protocol and the third containing all the groups and verify the results. - fs1 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2]) - for group_id, f in fs1.items(): - result = f.result() - assert result.group_id in [group_id_new1, group_id_new2] - assert result.is_simple_consumer_group is False - assert result.state == ConsumerGroupState.STABLE - assert result.type == ConsumerGroupType.CONSUMER - assert len(result.members) == 1 - for member in result.members: - assert member.client_id in [client_id1, client_id2] - assert member.assignment.topic_partitions == partition - - fs2 = admin_client.describe_consumer_groups(group_ids=[group_id_old1, group_id_old2]) - for group_id, f in fs2.items(): - result = f.result() - assert result.group_id in [group_id_old1, group_id_old2] - assert result.is_simple_consumer_group is False - assert result.state == ConsumerGroupState.STABLE - assert result.type == ConsumerGroupType.CLASSIC - assert len(result.members) == 1 - for member in result.members: - assert member.client_id in [client_id3, client_id4] - assert member.assignment.topic_partitions == partition - - fs3 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2, group_id_old1, group_id_old2]) - for group_id, f in fs3.items(): + # Describe and verify all consumer groups + fs_all = admin_client.describe_consumer_groups(list(group_ids.values())) + for group_id, f in fs_all.items(): result = f.result() - assert result.group_id in [group_id_new1, group_id_new2, group_id_old1, group_id_old2] + assert result.group_id in group_ids.values() assert result.is_simple_consumer_group is False assert result.state == ConsumerGroupState.STABLE - if result.group_id in [group_id_new1, group_id_new2]: + if result.group_id in [group_ids["new1"], group_ids["new2"]]: assert result.type == ConsumerGroupType.CONSUMER + assert result.members[0].client_id in client_ids["new"] else: assert result.type == ConsumerGroupType.CLASSIC - assert len(result.members) == 1 - for member in result.members: - if result.group_id in [group_id_new1, group_id_new2]: - assert member.client_id in [client_id1, client_id2] - else: - assert member.client_id in [client_id3, client_id4] - assert member.assignment.topic_partitions == partition + assert result.members[0].client_id in client_ids["old"] + assert result.members[0].assignment.topic_partitions == partition + # Close all consumers for consumer in consumers: consumer.close() From 94a9df0fd7724915faedf2fe57bdff6b280c6f9a Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 17 Apr 2025 13:09:01 +0530 Subject: [PATCH 06/12] target assignment declaration --- examples/adminapi.py | 2 +- src/confluent_kafka/admin/_group.py | 4 ++-- src/confluent_kafka/src/Admin.c | 5 ++++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 29030b0de..0eb87216c 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -552,7 +552,7 @@ def example_describe_consumer_groups(a, args): if member.target_assignment: print(" Target Assignments:") for toppar in member.target_assignment.topic_partitions: - print(" {} [{}]".format(toppar.topic, toppar.partition)) + print(f" {toppar.topic} [{toppar.partition}]") if (include_auth_ops): print(" Authorized operations: ") op_string = "" diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 1f7882cc4..7823e976a 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -100,7 +100,7 @@ class MemberDescription: The instance id of the group member. """ - def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None): + def __init__(self, member_id, client_id, host, assignment, group_instance_id=None, target_assignment=None): self.member_id = member_id self.client_id = client_id self.host = host @@ -135,7 +135,7 @@ class ConsumerGroupDescription: """ def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state, - type, coordinator, authorized_operations=None): + coordinator, authorized_operations=None, type=ConsumerGroupType.UNKNOWN): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group self.members = members diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index f9c3c0866..451017ad6 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -3857,6 +3857,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio PyObject *args = NULL; PyObject *kwargs = NULL; PyObject *assignment = NULL; + PyObject *target_assignment = NULL; const rd_kafka_MemberAssignment_t *c_assignment; const rd_kafka_MemberAssignment_t *c_target_assignment; @@ -3895,7 +3896,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member); if(c_target_assignment) { - PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment); + target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment); if (!target_assignment) { goto err; } @@ -3910,6 +3911,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio Py_DECREF(kwargs); Py_DECREF(MemberDescription_type); Py_DECREF(assignment); + Py_XDECREF(target_assignment); return member; err: @@ -3918,6 +3920,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio Py_XDECREF(kwargs); Py_XDECREF(MemberDescription_type); Py_XDECREF(assignment); + Py_XDECREF(target_assignment); Py_XDECREF(member); return NULL; } From 55d7f080eed1958cf9528a47e706c5933b450356 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 17 Apr 2025 16:06:48 +0530 Subject: [PATCH 07/12] Protocol->group_protocol --- .../admin/test_describe_consumer_groups_compatability.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py index 93bacd7bf..08d9aadda 100644 --- a/tests/integration/admin/test_describe_consumer_groups_compatability.py +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -19,13 +19,12 @@ topic_prefix = "test-topic" -def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): +def create_consumers(kafka_cluster, topic, group_id, client_id, group_protocol): conf = {'group.id': group_id, 'client.id': client_id, - 'group.protocol': Protocol, + 'group.protocol': group_protocol, 'enable.auto.commit': False, - 'auto.offset.reset': 'earliest', - 'debug': 'all'} + 'auto.offset.reset': 'earliest'} consumer = kafka_cluster.consumer(conf) consumer.subscribe([topic]) consumer.poll(10) From 80a1e3a81576e73546206653cd8ce798866c9165 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 17 Apr 2025 16:27:55 +0530 Subject: [PATCH 08/12] name change --- .../admin/test_describe_consumer_groups_compatability.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py index 08d9aadda..19c43aad8 100644 --- a/tests/integration/admin/test_describe_consumer_groups_compatability.py +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -100,7 +100,7 @@ def verify_consumer_group_results(fs, expected_group_ids, expected_type, expecte consumer.close() -def test_describe_consumer_groups_compatability(kafka_cluster): +def test_describe_consumer_groups_compatibility(kafka_cluster): admin_client = kafka_cluster.admin() From 1d2716e2fd0176e0d5eddd3669665ae0a5edfa8f Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 17 Apr 2025 16:46:51 +0530 Subject: [PATCH 09/12] Changelog added --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc0ac4857..36a46e565 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Confluent's Python client for Apache Kafka +## v2.10.0 + +v2.10.0 is a feature release with the following fixes and enhancements: + +- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856) +- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922). + ## v2.9.0 v2.9.0 is a feature release with the following fixes and enhancements: From dd43a7d3c6073f15ea5fb2d32d1d5916373166df Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 17 Apr 2025 17:43:40 +0530 Subject: [PATCH 10/12] removed test --- ..._describe_consumer_groups_compatability.py | 121 ------------------ 1 file changed, 121 deletions(-) delete mode 100644 tests/integration/admin/test_describe_consumer_groups_compatability.py diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py deleted file mode 100644 index 19c43aad8..000000000 --- a/tests/integration/admin/test_describe_consumer_groups_compatability.py +++ /dev/null @@ -1,121 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2024 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from confluent_kafka import ConsumerGroupState, ConsumerGroupType, TopicPartition -import uuid - -topic_prefix = "test-topic" - - -def create_consumers(kafka_cluster, topic, group_id, client_id, group_protocol): - conf = {'group.id': group_id, - 'client.id': client_id, - 'group.protocol': group_protocol, - 'enable.auto.commit': False, - 'auto.offset.reset': 'earliest'} - consumer = kafka_cluster.consumer(conf) - consumer.subscribe([topic]) - consumer.poll(10) - return consumer - - -def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): - def create_consumer_groups(): - """Create consumer groups with new and old protocols.""" - group_ids = { - "new1": f"test-group_new1-{uuid.uuid4()}", - "new2": f"test-group_new2-{uuid.uuid4()}", - "old1": f"test-group_old1-{uuid.uuid4()}", - "old2": f"test-group_old2-{uuid.uuid4()}", - } - client_ids = { - "new": ["test-client1", "test-client2"], - "old": ["test-client3", "test-client4"], - } - - consumers = [ - create_consumers(kafka_cluster, topic, group_ids["new1"], client_ids["new"][0], "consumer"), - create_consumers(kafka_cluster, topic, group_ids["new2"], client_ids["new"][1], "consumer"), - create_consumers(kafka_cluster, topic, group_ids["old1"], client_ids["old"][0], "classic"), - create_consumers(kafka_cluster, topic, group_ids["old2"], client_ids["old"][1], "classic"), - ] - return group_ids, client_ids, consumers - - def verify_consumer_group_results(fs, expected_group_ids, expected_type, expected_clients): - """Verify the results of consumer group descriptions.""" - for group_id, f in fs.items(): - result = f.result() - assert result.group_id in expected_group_ids - assert result.is_simple_consumer_group is False - assert result.state == ConsumerGroupState.STABLE - assert result.type == expected_type - assert len(result.members) == 1 - for member in result.members: - assert member.client_id in expected_clients - assert member.assignment.topic_partitions == partition - - # Create consumer groups - group_ids, client_ids, consumers = create_consumer_groups() - partition = [TopicPartition(topic, 0)] - - # Describe and verify new group protocol consumer groups - fs_new = admin_client.describe_consumer_groups([group_ids["new1"], group_ids["new2"]]) - verify_consumer_group_results(fs_new, [group_ids["new1"], group_ids["new2"]], - ConsumerGroupType.CONSUMER, client_ids["new"]) - - # Describe and verify old group protocol consumer groups - fs_old = admin_client.describe_consumer_groups([group_ids["old1"], group_ids["old2"]]) - verify_consumer_group_results(fs_old, [group_ids["old1"], group_ids["old2"]], - ConsumerGroupType.CLASSIC, client_ids["old"]) - - # Describe and verify all consumer groups - fs_all = admin_client.describe_consumer_groups(list(group_ids.values())) - for group_id, f in fs_all.items(): - result = f.result() - assert result.group_id in group_ids.values() - assert result.is_simple_consumer_group is False - assert result.state == ConsumerGroupState.STABLE - if result.group_id in [group_ids["new1"], group_ids["new2"]]: - assert result.type == ConsumerGroupType.CONSUMER - assert result.members[0].client_id in client_ids["new"] - else: - assert result.type == ConsumerGroupType.CLASSIC - assert result.members[0].client_id in client_ids["old"] - assert result.members[0].assignment.topic_partitions == partition - - # Close all consumers - for consumer in consumers: - consumer.close() - - -def test_describe_consumer_groups_compatibility(kafka_cluster): - - admin_client = kafka_cluster.admin() - - # Create Topic - topic_config = {"compression.type": "gzip"} - our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, - { - "num_partitions": 1, - "config": topic_config, - "replication_factor": 1, - }, - validate_only=False - ) - - # Delete created topic - fs = admin_client.delete_topics([our_topic]) - for topic, f in fs.items(): - f.result() From 5808fc1d65775d2190e41604bfca806171e70542 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 17 Apr 2025 18:05:56 +0530 Subject: [PATCH 11/12] Changelog change --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36a46e565..3b148fc02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,11 @@ v2.10.0 is a feature release with the following fixes and enhancements: - [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856) -- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922). +- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#1873). + +confluent-kafka-python v2.10.0 is based on librdkafka v2.10.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. ## v2.9.0 From 933e813546bc8c9103f0a1a00935eb89f80fb17b Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Thu, 17 Apr 2025 18:39:08 +0530 Subject: [PATCH 12/12] v2.10.0rc3 release changes (#1969) --- docs/conf.py | 4 ++-- examples/docker/Dockerfile.alpine | 2 +- pyproject.toml | 2 +- src/confluent_kafka/src/confluent_kafka.h | 14 +++++++------- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 95fca1765..097484066 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -20,14 +20,14 @@ ###################################################################### # General information about the project. project = u'confluent-kafka' -copyright = u'2016-2024, Confluent Inc.' +copyright = u'2016-2025, Confluent Inc.' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -version = '2.9.0' +version = '2.10.0rc3' # The full version, including alpha/beta/rc tags. release = version ###################################################################### diff --git a/examples/docker/Dockerfile.alpine b/examples/docker/Dockerfile.alpine index c7f37fe4a..db08bad1e 100644 --- a/examples/docker/Dockerfile.alpine +++ b/examples/docker/Dockerfile.alpine @@ -30,7 +30,7 @@ FROM alpine:3.12 COPY . /usr/src/confluent-kafka-python -ENV LIBRDKAFKA_VERSION="v2.8.0" +ENV LIBRDKAFKA_VERSION="v2.10.0-RC3" ENV KCAT_VERSION="master" ENV CKP_VERSION="master" diff --git a/pyproject.toml b/pyproject.toml index e581b8e45..c3b7aa814 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "confluent-kafka" -version = "2.9.0" +version = "2.10.0rc3" description = "Confluent's Python client for Apache Kafka" classifiers = [ "Development Status :: 5 - Production/Stable", diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 12d7c14e1..871ce88af 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -42,8 +42,8 @@ * 0xMMmmRRPP * MM=major, mm=minor, RR=revision, PP=patchlevel (not used) */ -#define CFL_VERSION 0x02090000 -#define CFL_VERSION_STR "2.9.0" +#define CFL_VERSION 0x020a0000 +#define CFL_VERSION_STR "2.10.0rc3" /** * Minimum required librdkafka version. This is checked both during @@ -51,19 +51,19 @@ * Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error * defines and strings in sync. */ -#define MIN_RD_KAFKA_VERSION 0x020800ff +#define MIN_RD_KAFKA_VERSION 0x020a00ff #ifdef __APPLE__ -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION #ifdef __APPLE__ -#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #endif