diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 67bcacb..97be071 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,15 @@ Changelog ######### +next +==== + +Improvements +------------ + +* Call the task received signal for ``Batches`` task. (`#85 `_) + + 0.8.1 (2023-06-27) ================== @@ -34,6 +43,7 @@ Maintenance * Support Python 3.11. (`#75 `_) * Drop support for Python 3.7. (`#77 `_) + 0.7 (2022-05-02) ================ diff --git a/celery_batches/__init__.py b/celery_batches/__init__.py index b8aa179..7ce1cbf 100644 --- a/celery_batches/__init__.py +++ b/celery_batches/__init__.py @@ -17,6 +17,7 @@ from celery_batches.trace import apply_batches_task from celery import VERSION as CELERY_VERSION +from celery import signals from celery.app import Celery from celery.app.task import Task from celery.concurrency.base import BasePool @@ -237,7 +238,7 @@ def task_message_handler( else: body, headers, decoded, utc = proto1_to_proto2(message, body) - request = Req( + req = Req( message, on_ack=ack, on_reject=reject, @@ -251,7 +252,9 @@ def task_message_handler( utc=utc, connection_errors=connection_errors, ) - put_buffer(request) + put_buffer(req) + + signals.task_received.send(sender=consumer, request=req) if self._tref is None: # first request starts flush timer. self._tref = timer.call_repeatedly(self.flush_interval, flush_buffer) diff --git a/t/integration/test_batches.py b/t/integration/test_batches.py index 61bf331..2283d07 100644 --- a/t/integration/test_batches.py +++ b/t/integration/test_batches.py @@ -157,13 +157,13 @@ def test_signals(celery_app: Celery, celery_worker: TestWorkController) -> None: (signals.before_task_publish, 2), (signals.after_task_publish, 2), (signals.task_sent, 2), - # The task only runs a single time. + (signals.task_received, 2), + # The Batch task only runs a single time. (signals.task_prerun, 1), (signals.task_postrun, 1), - (signals.task_received, 0), + (signals.task_success, 1), # Other task signals are not implemented. (signals.task_retry, 0), - (signals.task_success, 1), (signals.task_failure, 0), (signals.task_revoked, 0), (signals.task_internal_error, 0),