Skip to content

KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #1873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 17, 2025
Merged
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Confluent's Python client for Apache Kafka

## v2.10.0

v2.10.0 is a feature release with the following fixes and enhancements:

- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856)
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#1873).

confluent-kafka-python v2.10.0 is based on librdkafka v2.10.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
confluent-kafka-python v2.10.0 is based on librdkafka v2.10.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

## v2.9.0

v2.9.0 is a feature release with the following fixes and enhancements:
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
######################################################################
# General information about the project.
project = u'confluent-kafka'
copyright = u'2016-2024, Confluent Inc.'
copyright = u'2016-2025, Confluent Inc.'

# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '2.9.0'
version = '2.10.0rc3'
# The full version, including alpha/beta/rc tags.
release = version
######################################################################
Expand Down
5 changes: 5 additions & 0 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ def example_describe_consumer_groups(a, args):
print("Group Id: {}".format(g.group_id))
print(" Is Simple : {}".format(g.is_simple_consumer_group))
print(" State : {}".format(g.state))
print(" Type : {}".format(g.type))
print(" Partition Assignor : {}".format(g.partition_assignor))
print(
f" Coordinator : {g.coordinator}")
Expand All @@ -548,6 +549,10 @@ def example_describe_consumer_groups(a, args):
print(" Assignments :")
for toppar in member.assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
if member.target_assignment:
print(" Target Assignments:")
for toppar in member.target_assignment.topic_partitions:
print(f" {toppar.topic} [{toppar.partition}]")
if (include_auth_ops):
print(" Authorized operations: ")
op_string = ""
Expand Down
2 changes: 1 addition & 1 deletion examples/docker/Dockerfile.alpine
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ FROM alpine:3.12

COPY . /usr/src/confluent-kafka-python

ENV LIBRDKAFKA_VERSION="v2.8.0"
ENV LIBRDKAFKA_VERSION="v2.10.0-RC3"
ENV KCAT_VERSION="master"
ENV CKP_VERSION="master"

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "confluent-kafka"
version = "2.9.0"
version = "2.10.0rc3"
description = "Confluent's Python client for Apache Kafka"
classifiers = [
"Development Status :: 5 - Production/Stable",
Expand Down
11 changes: 9 additions & 2 deletions src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,18 @@ class MemberDescription:
The host where the group member is running.
assignment: MemberAssignment
The assignment of the group member
target_assignment: MemberAssignment
The target assignment of the group member
group_instance_id : str
The instance id of the group member.
"""

def __init__(self, member_id, client_id, host, assignment, group_instance_id=None):
def __init__(self, member_id, client_id, host, assignment, group_instance_id=None, target_assignment=None):
self.member_id = member_id
self.client_id = client_id
self.host = host
self.assignment = assignment
self.target_assignment = target_assignment
self.group_instance_id = group_instance_id


Expand All @@ -123,14 +126,16 @@ class ConsumerGroupDescription:
Partition assignor.
state : ConsumerGroupState
Current state of the consumer group.
type : ConsumerGroupType
Type of the consumer group.
coordinator: Node
Consumer group coordinator.
authorized_operations: list(AclOperation)
AclOperations allowed for the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,
coordinator, authorized_operations=None):
coordinator, authorized_operations=None, type=ConsumerGroupType.UNKNOWN):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
self.members = members
Expand All @@ -143,4 +148,6 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign
self.partition_assignor = partition_assignor
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if type is not None:
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)
self.coordinator = coordinator
15 changes: 15 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3857,7 +3857,9 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
PyObject *args = NULL;
PyObject *kwargs = NULL;
PyObject *assignment = NULL;
PyObject *target_assignment = NULL;
const rd_kafka_MemberAssignment_t *c_assignment;
const rd_kafka_MemberAssignment_t *c_target_assignment;

MemberDescription_type = cfl_PyObject_lookup("confluent_kafka.admin",
"MemberDescription");
Expand Down Expand Up @@ -3892,6 +3894,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio

PyDict_SetItemString(kwargs, "assignment", assignment);

c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member);
if(c_target_assignment) {
target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment);
if (!target_assignment) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check NULL case is working or not. target_assignment should be None in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is working

goto err;
}
PyDict_SetItemString(kwargs, "target_assignment", target_assignment);
}

args = PyTuple_New(0);

member = PyObject_Call(MemberDescription_type, args, kwargs);
Expand All @@ -3900,6 +3911,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
Py_DECREF(kwargs);
Py_DECREF(MemberDescription_type);
Py_DECREF(assignment);
Py_XDECREF(target_assignment);
return member;

err:
Expand All @@ -3908,6 +3920,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
Py_XDECREF(kwargs);
Py_XDECREF(MemberDescription_type);
Py_XDECREF(assignment);
Py_XDECREF(target_assignment);
Py_XDECREF(member);
return NULL;
}
Expand Down Expand Up @@ -4003,6 +4016,8 @@ static PyObject *Admin_c_ConsumerGroupDescription_to_py(

cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupDescription_state(c_consumer_group_description));

cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupDescription_type(c_consumer_group_description));

args = PyTuple_New(0);

consumer_group_description = PyObject_Call(ConsumerGroupDescription_type, args, kwargs);
Expand Down
14 changes: 7 additions & 7 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@
* 0xMMmmRRPP
* MM=major, mm=minor, RR=revision, PP=patchlevel (not used)
*/
#define CFL_VERSION 0x02090000
#define CFL_VERSION_STR "2.9.0"
#define CFL_VERSION 0x020a0000
#define CFL_VERSION_STR "2.10.0rc3"

/**
* Minimum required librdkafka version. This is checked both during
* build-time (just below) and runtime (see confluent_kafka.c).
* Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error
* defines and strings in sync.
*/
#define MIN_RD_KAFKA_VERSION 0x020800ff
#define MIN_RD_KAFKA_VERSION 0x020a00ff

#ifdef __APPLE__
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
#else
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
#endif

#if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION
#ifdef __APPLE__
#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
#else
#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
#endif
#endif

Expand Down