Skip to content

Commit

Permalink
fixed issues with celery task typings
Browse files Browse the repository at this point in the history
  • Loading branch information
bigabig committed Oct 17, 2024
1 parent a2ae292 commit b24dde0
Showing 1 changed file with 39 additions and 8 deletions.
47 changes: 39 additions & 8 deletions backend/src/app/celery/background_jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from pathlib import Path
from typing import Any, List

from celery import Task

from app.core.data.crawler.crawler_service import CrawlerService
from app.core.data.dto.crawler_job import CrawlerJobParameters, CrawlerJobRead
from app.core.data.dto.export_job import ExportJobParameters, ExportJobRead
Expand All @@ -15,6 +17,8 @@ def start_cota_refinement_job_async(
) -> None:
from app.celery.background_jobs.tasks import start_cota_refinement_job_task

assert isinstance(start_cota_refinement_job_task, Task), "Not a Celery Task"

start_cota_refinement_job_task.apply_async(kwargs={"cota_job_id": cota_job_id})


Expand All @@ -23,6 +27,8 @@ def start_trainer_job_async(
) -> None:
from app.celery.background_jobs.tasks import start_trainer_job_task

assert isinstance(start_trainer_job_task, Task), "Not a Celery Task"

start_trainer_job_task.apply_async(kwargs={"trainer_job_id": trainer_job_id})


Expand All @@ -31,6 +37,8 @@ def import_uploaded_archive_apply_async(
) -> Any:
from app.celery.background_jobs.tasks import import_uploaded_archive

assert isinstance(import_uploaded_archive, Task), "Not a Celery Task"

return import_uploaded_archive.apply_async(
kwargs={"archive_file_path_and_project_id": (archive_file_path, project_id)},
)
Expand All @@ -41,6 +49,8 @@ def prepare_and_start_export_job_async(
) -> ExportJobRead:
from app.celery.background_jobs.tasks import start_export_job

assert isinstance(start_export_job, Task), "Not a Celery Task"

exs: ExportService = ExportService()
ex_job = exs.prepare_export_job(export_params)
start_export_job.apply_async(kwargs={"export_job": ex_job})
Expand All @@ -55,16 +65,19 @@ def prepare_and_start_crawling_job_async(
start_crawler_job,
)

assert isinstance(start_crawler_job, Task), "Not a Celery Task"
assert isinstance(import_uploaded_archive, Task), "Not a Celery Task"

cs: CrawlerService = CrawlerService()
cj = cs.prepare_crawler_job(crawler_params)
start_export_job_chain = (
# crawl the data via scrapy and zip the data
start_crawler_job.signature(kwargs={"crawler_job": cj})
|
# import the zip
# TODO create a PPJ for the import
import_uploaded_archive.signature()
)

job1 = start_crawler_job.signature(kwargs={"crawler_job": cj})
job2 = import_uploaded_archive.signature()

assert job1 is not None, "Job 1 is None"
assert job2 is not None, "Job 2 is None"

start_export_job_chain = job1 | job2
start_export_job_chain.apply_async()

return cj
Expand All @@ -75,6 +88,8 @@ def prepare_and_start_llm_job_async(
) -> LLMJobRead:
from app.celery.background_jobs.tasks import start_llm_job

assert isinstance(start_llm_job, Task), "Not a Celery Task"

llms: LLMService = LLMService()
llm_job = llms.prepare_llm_job(llm_job_params)
start_llm_job.apply_async(kwargs={"llm_job": llm_job})
Expand All @@ -88,6 +103,10 @@ def execute_text_preprocessing_pipeline_apply_async(
execute_text_preprocessing_pipeline_task,
)

assert isinstance(
execute_text_preprocessing_pipeline_task, Task
), "Not a Celery Task"

for cargo in cargos:
execute_text_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})

Expand All @@ -99,6 +118,10 @@ def execute_image_preprocessing_pipeline_apply_async(
execute_image_preprocessing_pipeline_task,
)

assert isinstance(
execute_image_preprocessing_pipeline_task, Task
), "Not a Celery Task"

for cargo in cargos:
execute_image_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})

Expand All @@ -110,6 +133,10 @@ def execute_audio_preprocessing_pipeline_apply_async(
execute_audio_preprocessing_pipeline_task,
)

assert isinstance(
execute_audio_preprocessing_pipeline_task, Task
), "Not a Celery Task"

for cargo in cargos:
execute_audio_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})

Expand All @@ -121,5 +148,9 @@ def execute_video_preprocessing_pipeline_apply_async(
execute_video_preprocessing_pipeline_task,
)

assert isinstance(
execute_video_preprocessing_pipeline_task, Task
), "Not a Celery Task"

for cargo in cargos:
execute_video_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})

0 comments on commit b24dde0

Please sign in to comment.