Skip to content

Commit

Permalink
Call the task_received signal.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Jan 8, 2024
1 parent 1ac35c5 commit ae747d8
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
Changelog
#########

next
====

Improvements
------------

* Call the task received signal for ``Batches`` task. (`#85 <https://github.com/clokep/celery-batches/pull/85>`_)


0.8.1 (2023-06-27)
==================

Expand Down Expand Up @@ -34,6 +43,7 @@ Maintenance
* Support Python 3.11. (`#75 <https://github.com/clokep/celery-batches/pull/75>`_)
* Drop support for Python 3.7. (`#77 <https://github.com/clokep/celery-batches/pull/77>`_)


0.7 (2022-05-02)
================

Expand Down
7 changes: 5 additions & 2 deletions celery_batches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from celery_batches.trace import apply_batches_task

from celery import VERSION as CELERY_VERSION
from celery import signals
from celery.app import Celery
from celery.app.task import Task
from celery.concurrency.base import BasePool
Expand Down Expand Up @@ -237,7 +238,7 @@ def task_message_handler(
else:
body, headers, decoded, utc = proto1_to_proto2(message, body)

request = Req(
req = Req(
message,
on_ack=ack,
on_reject=reject,
Expand All @@ -251,7 +252,9 @@ def task_message_handler(
utc=utc,
connection_errors=connection_errors,
)
put_buffer(request)
put_buffer(req)

signals.task_received.send(sender=consumer, request=req)

if self._tref is None: # first request starts flush timer.
self._tref = timer.call_repeatedly(self.flush_interval, flush_buffer)
Expand Down
6 changes: 3 additions & 3 deletions t/integration/test_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ def test_signals(celery_app: Celery, celery_worker: TestWorkController) -> None:
(signals.before_task_publish, 2),
(signals.after_task_publish, 2),
(signals.task_sent, 2),
# The task only runs a single time.
(signals.task_received, 2),
# The Batch task only runs a single time.
(signals.task_prerun, 1),
(signals.task_postrun, 1),
(signals.task_received, 0),
(signals.task_success, 1),
# Other task signals are not implemented.
(signals.task_retry, 0),
(signals.task_success, 1),
(signals.task_failure, 0),
(signals.task_revoked, 0),
(signals.task_internal_error, 0),
Expand Down

0 comments on commit ae747d8

Please sign in to comment.