Here's the improved README with an explanation of the provided test cases:
This proof of concept (PoC) demonstrates a distributed task queue implementation using Celery with RabbitMQ as the message broker and Redis as the result backend. The project simulates a data collection and processing workflow using a system of subcollectors. Each subcollector generates a random list of integers for a given topic, and the results are aggregated, cleaned, and processed.
The primary goal of this PoC is to test and demonstrate the capability of Celery to manage complex workflows involving multiple asynchronous tasks. It showcases how to:
- Dispatch tasks to subcollectors in parallel.
- Aggregate results from all subcollectors.
- Perform cleanup and processing on aggregated data.
celery_app.py
: Initializes the Celery app and includes the configuration for the Celery connection to RabbitMQ as the broker and Redis as the result backend.tasks.py
: Contains the definition of all Celery tasks, including subcollectors, data aggregation, cleanup, and processing tasks.tests/
: Directory containing test files to validate the functionality of the Celery tasks.test_tasks.py
: Implements tests for each Celery task using pytest.
conda/base.yaml
: Conda environment file listing all necessary Python dependencies for the project.
The Celery workflow in this project orchestrates a series of tasks to simulate a data collection and processing pipeline using subcollector tasks. The workflow leverages Celery's capabilities for asynchronous task execution, task grouping, and result aggregation. Below is a step-by-step breakdown of the workflow:
- Functionality: The workflow begins with the
collector_request
task, which initiates the data collection process based on a specified topic. - Key Methods:
generate_collector_request(topic: str) -> str
: Generates a unique request ID for the collection request.group()
: Groups multiple subcollector tasks (collector_1
,collector_2
,collector_3
) to be executed in parallel.chord(group())(callback)
: Achord
is a Celery primitive that takes a group of tasks and a callback task. It ensures that the callback task (collector_gathering
) is executed only after all tasks in the group have completed.
- Subcollectors:
collector_1
,collector_2
,collector_3
- Functionality: Each subcollector task generates a random list of integers simulating the collection of data for the given topic.
- Execution: These tasks are executed in parallel as part of the
group
passed to thechord
. This parallel execution is enabled by the.apply_async()
method, ensuring that each task can run concurrently without waiting for the others.
- Callback Task:
collector_gathering
- Functionality: Aggregates the results from all subcollector tasks. This
task acts as the callback for the
chord
, which means it automatically receives the aggregated results of the group as its input. - Method Calls:
cleanup.delay(combined_result, request_id)
: After aggregation, thecleanup
task is called asynchronously with the.delay()
method, passing the combined results for further processing.
- Functionality: Aggregates the results from all subcollector tasks. This
task acts as the callback for the
- Cleanup Task:
cleanup
- Functionality: Performs preliminary processing or cleanup on the aggregated data.
- Method Calls:
process.delay(data, request_id)
: Calls theprocess
task asynchronously for final processing.
- Process Task:
process
- Functionality: Conducts the final processing of the data. In this example, it counts the total items and prints the result.
- Returns: A dictionary with the
request_id
and theitem_count
, providing a simple summary of the processing outcome.
This workflow demonstrates the power of Celery for handling complex asynchronous
task pipelines. It showcases task parallelization (group
), conditional task
execution based on the completion of a group of tasks (chord
), and chaining
further processing steps (delay
). Each task is designed to perform a specific
role within the data collection and processing pipeline, from initiating
collection requests to final data processing.
- Docker
- Conda or Mamba
-
Clone the repository and navigate to the project directory.
-
Create a new Conda environment using the
base.yaml
file:mamba env create -f conda/base.yaml
Or, if you're using Conda:
mamba env create -f conda/base.yaml
-
Activate the new environment:
mamba activate poc-celery
Run RabbitMQ and Redis containers with the following commands:
docker run --name rabbitmq -d -p 5672:5672 rabbitmq
docker run --name redis -d -p 6379:6379 redis
or
bash scripts/setup.sh
These commands start RabbitMQ and Redis servers in Docker containers named
rabbitmq
and redis
, respectively.
To facilitate an efficient development and monitoring environment, we've
prepared a bash script start_celery_and_flower.sh
that simultaneously starts a
Celery worker and Flower, a real-time monitoring tool. This script fetches the
RabbitMQ and Redis container IP addresses dynamically, ensuring that both Celery
and Flower are correctly configured to communicate with RabbitMQ as the broker
and Redis as the result backend.
To start both the Celery worker and Flower, navigate to your project's root directory and run:
bash scripts/setup.sh
This command executes the script that:
-
Starts a Celery Worker: Launches a Celery worker instance using
poc_celery.poc1.celery_app
as the application module. This worker listens for tasks dispatched to the queues and executes them as they arrive. -
Launches Flower: Initiates Flower on the default port (5555), allowing you to access a web-based user interface to monitor and manage the Celery worker and tasks. Flower provides insights into task progress, worker status, task history, and much more, making it an invaluable tool for debugging and optimizing your task workflows.
With your environment prepared and the Celery worker launched, you can now run the tests using pytest. Navigate to the root directory of your project and execute:
pytest -vvv tests/
The test cases aim to validate the behavior of various Celery tasks within the
project, including both asynchronous tasks (tasks_async.py
) and collectors
tasks (tasks_collectors.py
). Here's an overview of each test case:
-
test_clean_data
: This test verifies that theclean_data
task behaves correctly when called with a file path. It usesmock_open
to simulate file I/O operations and ensures that theopen
function is not called, indicating that the task does not perform any file operations directly. -
test_create_project
: This test validates the behavior of thecreate_project
task by mocking file I/O operations and asserting that the task correctly writes data to the specified file path. It generates sample data rows and calls the task with each row, mocking theopen
function to capture the contents written to the file. -
test_create_project_stress
: This asynchronous test case simulates a stress scenario by executing a large number ofcreate_project
tasks concurrently. It repeats a set of sample data rows multiple times to reach the desired number of task calls and then waits for a short duration to allow all tasks to complete. The test assertsTrue
at the end to indicate successful execution.
-
test_generate_collector_request
: Validates thatgenerate_collector_request
produces a valid UUID string, confirming the generation of unique request identifiers. -
test_collector_request_triggers_sub_collectors
: Verifies the orchestration withincollector_request
to trigger subcollector tasks. It mocks thegroup
method and the signature ofcollector_gathering
to intercept and assert their usage without actual task execution.
- Interpreting Results: Pytest will provide a summary indicating which tests passed, failed, or were skipped. For any failures, detailed error information and traceback will be provided to aid in debugging.
These test cases ensure that the Celery tasks perform as expected under different scenarios, including normal operation, stress testing, and error conditions. They help verify the correctness and robustness of the task implementations and provide confidence in the overall reliability of the Celery workflow.
In this project, we harness Celery's powerful asynchronous task execution to bolster performance and scalability. Here are some key best practices we've applied:
- Decoupling Components: By employing Celery, we effectively decouple the
task execution from the main application flow. This separation allows for more scalable and maintainable code architecture.
-
Optimizing Task Execution: Utilizing Celery's
group
,chain
, andchord
primitives, we've structured complex task workflows that maximize concurrency and minimize processing time. This structured approach ensures tasks are executed in an optimal sequence and only after all required dependencies have been satisfied. -
Robust Error Handling: We've implemented strategic error handling within our tasks to ensure resilience. By catching exceptions and using retry mechanisms where applicable, we maintain the integrity of our task pipeline, even in the face of transient failures.
Flower is a critical tool for our project, providing comprehensive monitoring and management capabilities for our Celery workers and tasks. Here's why Flower is indispensable:
-
Visibility: Flower's real-time monitoring gives us instant visibility into our task queues, worker status, and the progress of task execution. This level of insight is invaluable for quickly identifying bottlenecks or failures in the task pipeline.
-
Direct Task Management: Through Flower's web interface, we gain direct control over task execution. The ability to cancel tasks, restart workers, and adjust task priorities on-the-fly empowers us to maintain optimal task flow under varying load conditions.
-
Insights for Debugging and Optimization: Flower's detailed execution statistics for each task guide our debugging and optimization efforts. Analyzing task durations, success rates, and retry counts helps us pinpoint inefficiencies and improve the overall performance of our task workflows.
-
Remote Accessibility: The web-based interface of Flower means we can monitor and manage our task environment from any location, facilitating remote debugging and administration without the need for direct server access.
-
Environment Isolation: Utilizing virtual environments and containerization (e.g., Docker) for our Celery and RabbitMQ/Redis setup ensures consistency across development, testing, and production environments, reducing the "it works on my machine" syndrome.
-
Automated Testing: Our project includes a suite of automated tests to validate task logic, asynchronous behavior, and failure handling scenarios. These tests, integrated into a CI/CD pipeline, ensure code quality and prevent regressions.
-
Documentation and Logging: Comprehensive documentation, including clear docstrings and READMEs, alongside detailed logging within tasks, facilitates easier maintenance and accelerates the onboarding process for new developers.
This PoC demonstrates a scalable and efficient way to manage distributed tasks using Celery with RabbitMQ as a message broker and Redis as the result backend. It exemplifies a practical application of Celery's capabilities in handling complex workflows with multiple asynchronous tasks, showcasing a system that can be adapted for various data processing and aggregati