Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"range" assignment updates #712

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
# Enforce idempotent producing for the internal RowProducer
_default_producer_extra_config = {"enable.idempotence": True}

# Force assignment strategy to be "range" for co-partitioning in internal Consumers
consumer_extra_config_overrides = {"partition.assignment.strategy": "range"}

_default_max_poll_interval_ms = 300000


Expand Down Expand Up @@ -301,7 +304,10 @@ def __init__(
self._on_message_processed = on_message_processed
self._on_processing_error = on_processing_error or default_on_processing_error

self._consumer = self._get_rowconsumer(on_error=on_consumer_error)
self._consumer = self._get_rowconsumer(
on_error=on_consumer_error,
extra_config_overrides=consumer_extra_config_overrides,
)
self._producer = self._get_rowproducer(on_error=on_producer_error)
self._running = False
self._failed = False
Expand Down Expand Up @@ -593,20 +599,27 @@ def get_producer(self) -> Producer:
)

def _get_rowconsumer(
self, on_error: Optional[ConsumerErrorCallback] = None
self,
on_error: Optional[ConsumerErrorCallback] = None,
extra_config_overrides: Optional[dict] = None,
) -> RowConsumer:
"""
Create a RowConsumer using the application config

Used to create the application consumer as well as the sources consumers
"""

extra_config_overrides = extra_config_overrides or {}
# Override the existing "extra_config" with new values
extra_config = {
**self._config.consumer_extra_config,
**extra_config_overrides,
}
return RowConsumer(
broker_address=self._config.broker_address,
consumer_group=self._config.consumer_group,
auto_offset_reset=self._config.auto_offset_reset,
auto_commit_enable=False, # Disable auto commit and manage commits manually
extra_config=self._config.consumer_extra_config,
extra_config=extra_config,
on_error=on_error,
)

Expand Down Expand Up @@ -692,7 +705,9 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
source,
topic,
self._get_rowproducer(transactional=False),
self._get_rowconsumer(),
self._get_rowconsumer(
extra_config_overrides=consumer_extra_config_overrides
),
self._get_topic_manager(),
)
return topic
Expand Down Expand Up @@ -897,10 +912,6 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
# get the source data.
self._source_manager.start_sources()

# First commit everything processed so far because assignment can take a while
# and fail
self._processing_context.commit_checkpoint(force=True)

# Assign partitions manually to pause the changelog topics
self._consumer.assign(topic_partitions)
# Pause changelog topic+partitions immediately after assignment
Expand Down
1 change: 0 additions & 1 deletion quixstreams/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def __init__(
self._consumer_config = {
# previous Quix Streams defaults
"enable.auto.offset.store": False,
# Force assignment strategy to "range" for co-partitioning
"partition.assignment.strategy": "range",
**(extra_config or {}),
**broker_address.as_librdkafka_dict(),
Expand Down
3 changes: 3 additions & 0 deletions tests/test_quixstreams/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ def test_init_with_quix_sdk_token_arg(self):
expected_consumer_extra_config = {
**extra_config,
**quix_extras,
"partition.assignment.strategy": "range",
}

def get_cfg_builder(quix_sdk_token):
Expand Down Expand Up @@ -903,6 +904,7 @@ def test_init_with_quix_sdk_token_env(self, monkeypatch):
expected_consumer_extra_config = {
**extra_config,
**quix_extras,
"partition.assignment.strategy": "range",
}

def get_cfg_builder(quix_sdk_token):
Expand Down Expand Up @@ -965,6 +967,7 @@ def test_init_with_quix_config_builder(self):
expected_consumer_extra_config = {
**extra_config,
**quix_extras,
"partition.assignment.strategy": "range",
}

def get_cfg_builder(quix_sdk_token):
Expand Down
Loading