Skip to content

Commit

Permalink
finish text import pipeline and start image pipeline (still buggy)
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmadHAW committed Oct 15, 2024
1 parent bf3bdde commit 4b4170a
Show file tree
Hide file tree
Showing 19 changed files with 259 additions and 113 deletions.
6 changes: 4 additions & 2 deletions backend/src/app/celery/background_jobs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ def execute_text_preprocessing_pipeline_(
pipeline.execute(cargo=cargo)


def execute_image_preprocessing_pipeline_(cargo: PipelineCargo) -> None:
pipeline = prepro.get_image_pipeline()
def execute_image_preprocessing_pipeline_(
cargo: PipelineCargo, is_init: bool = True
) -> None:
pipeline = prepro.get_image_pipeline(is_init=is_init)
logger.debug(
f"Executing image Preprocessing Pipeline\n\t{pipeline}\n\t for cargo"
f" {cargo.ppj_payload.filename}!"
Expand Down
6 changes: 4 additions & 2 deletions backend/src/app/celery/background_jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ def execute_text_preprocessing_pipeline_task(
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 5, "countdown": 5},
)
def execute_image_preprocessing_pipeline_task(cargo: PipelineCargo) -> None:
execute_image_preprocessing_pipeline_(cargo=cargo)
def execute_image_preprocessing_pipeline_task(
cargo: PipelineCargo, is_init: bool = True
) -> None:
execute_image_preprocessing_pipeline_(cargo=cargo, is_init=is_init)


@celery_worker.task(
Expand Down
48 changes: 48 additions & 0 deletions backend/src/app/core/data/crud/bbox_annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,54 @@ def create(

return db_obj

def create_multi(
self, db: Session, *, create_dtos: List[BBoxAnnotationCreateIntern]
) -> List[BBoxAnnotationORM]:
# update all affected annotation documents' timestamp
adoc_ids = list(
set([create_dto.annotation_document_id for create_dto in create_dtos])
)
for adoc_id in adoc_ids:
crud_adoc.update_timestamp(db=db, id=adoc_id)

return super().create_multi(db=db, create_dtos=create_dtos)

def create_bulk(
self, db: Session, *, user_id: int, create_dtos: List[BBoxAnnotationCreate]
) -> List[BBoxAnnotationORM]:
# group by user and sdoc_id
# identify codes
annotations_by_user_sdoc = {
(user_id, create_dto.sdoc_id): [] for create_dto in create_dtos
}
for create_dto in create_dtos:
annotations_by_user_sdoc[(user_id, create_dto.sdoc_id)].append(create_dto)

# find or create annotation documents for each user and sdoc_id
adoc_id_by_user_sdoc = {}
for user_id, sdoc_id in annotations_by_user_sdoc.keys():
adoc_id_by_user_sdoc[(user_id, sdoc_id)] = crud_adoc.exists_or_create(
db=db, user_id=user_id, sdoc_id=sdoc_id
).id

# create the annotations
return self.create_multi(
db=db,
create_dtos=[
BBoxAnnotationCreateIntern(
x_max=create_dto.x_max,
y_max=create_dto.y_max,
x_min=create_dto.y_min,
y_min=create_dto.y_min,
code_id=create_dto.code_id,
annotation_document_id=adoc_id_by_user_sdoc[
(user_id, create_dto.sdoc_id)
],
)
for create_dto in create_dtos
],
)

def read_by_user_and_sdoc(
self,
db: Session,
Expand Down
9 changes: 9 additions & 0 deletions backend/src/app/core/data/crud/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ def read_by_email(self, db: Session, *, email: str) -> UserORM:
raise NoSuchElementError(self.model, email=email)
return user

def read_by_email_if_exists(self, db: Session, *, email: str) -> Optional[UserORM]:
user = (
db.query(self.model)
.options(joinedload(self.model.projects))
.filter(self.model.email == email)
.first()
)
return user

def authenticate(self, db: Session, user_login: UserLogin) -> Optional[UserORM]:
try:
user = self.read_by_email(db=db, email=user_login.username)
Expand Down
16 changes: 16 additions & 0 deletions backend/src/app/core/data/dto/source_document_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ def get_value(self) -> Union[str, int, datetime, bool, List, None]:
return self.list_value
return None

def get_value_serializable(self) -> Union[str, int, bool, List, None]:
match self.project_metadata.metatype:
case MetaType.STRING:
return self.str_value
case MetaType.NUMBER:
return self.int_value
case MetaType.DATE:
return (
self.date_value.isoformat() if self.date_value else self.date_value
)
case MetaType.BOOLEAN:
return self.boolean_value
case MetaType.LIST:
return self.list_value
return None

@staticmethod
def with_value(
sdoc_metadata_id: int,
Expand Down
20 changes: 13 additions & 7 deletions backend/src/app/core/data/export/export_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
PROJECT_SDOC_METADATAS_EXPORT_NAMING_TEMPLATE = "project_{project_id}_metadatas"
PROJECT_DETAILS_EXPORT_NAMING_TEMPLATE = "project_{project_id}_details"
PROJECT_SDOC_LINKS_EXPORT_NAMING_TEMPLATE = "project_{project_id}_sdoc_links"
PROJECT_CODES_EXPORT_NAMING_TEMPLATE = "project_{project_id}_codes"
SCHEMA_JSON_EXPORT_NAME = "schema.json"


Expand Down Expand Up @@ -219,7 +220,7 @@ def __get_sdocs_metadata_for_export(
metadata_dict = dict()
for metadata in sdoc_metadata_dtos:
metadata_dict[metadata.project_metadata.key] = {
"value": metadata.get_value(),
"value": metadata.get_value_serializable(),
}
exported_sdocs_metadata.append(
{
Expand Down Expand Up @@ -288,6 +289,7 @@ def __generate_export_df_for_adoc(
# fill the DataFrame
data = {
"sdoc_name": [],
"user_email": [],
"user_first_name": [],
"user_last_name": [],
"code_name": [],
Expand All @@ -305,6 +307,7 @@ def __generate_export_df_for_adoc(

for span in span_read_resolved_dtos:
data["sdoc_name"].append(sdoc_dto.filename)
data["user_email"].append(user_dto.email)
data["user_first_name"].append(user_dto.first_name)
data["user_last_name"].append(user_dto.last_name)
data["code_name"].append(span.code.name)
Expand All @@ -322,6 +325,7 @@ def __generate_export_df_for_adoc(

for bbox in bbox_read_resolved_dtos:
data["sdoc_name"].append(sdoc_dto.filename)
data["user_email"].append(user_dto.email)
data["user_first_name"].append(user_dto.first_name)
data["user_last_name"].append(user_dto.last_name)
data["code_name"].append(bbox.code.name)
Expand Down Expand Up @@ -350,6 +354,7 @@ def __generate_export_df_for_span_annotations(
# fill the DataFrame
data = {
"sdoc_name": [],
"user_email": [],
"user_first_name": [],
"user_last_name": [],
"code_name": [],
Expand All @@ -363,6 +368,7 @@ def __generate_export_df_for_span_annotations(
sdoc = span.annotation_document.source_document
user = span.annotation_document.user
data["sdoc_name"].append(sdoc.filename)
data["user_email"].append(user.email)
data["user_first_name"].append(user.first_name)
data["user_last_name"].append(user.last_name)
data["code_name"].append(span.code.name)
Expand Down Expand Up @@ -471,7 +477,7 @@ def __generate_export_df_for_users_in_project(
users_data_df = pd.DataFrame(data)
return users_data_df

def __generate_export_dict_for_project_metadata(
def __generate_export_dict_for_project_details(
self, db: Session, project_id: int
) -> Dict[str, Union[str, int, datetime]]:
project_data = crud_project.read(db=db, id=project_id)
Expand Down Expand Up @@ -751,7 +757,7 @@ def _export_project_codes(
export_file = self.__write_export_data_to_temp_file(
codes,
export_format=export_format,
fn=f"project_{project_id}_codes",
fn=PROJECT_CODES_EXPORT_NAMING_TEMPLATE.format(project_id=project_id),
)
return export_file
msg = f"No Codes to export in Project {project_id}"
Expand Down Expand Up @@ -866,14 +872,14 @@ def _export_all_data_from_proj(
db=db, project_id=project_id
)

# generate project meta data
logger.info("exporting project meta data...")
exported_project_metadata = self.__generate_export_dict_for_project_metadata(
# generate project details
logger.info("exporting project details...")
exported_project_details = self.__generate_export_dict_for_project_details(
db=db, project_id=project_id
)
# write project details to files
project_file = self.__write_exported_json_to_temp_file(
exported_file=exported_project_metadata,
exported_file=exported_project_details,
fn=PROJECT_DETAILS_EXPORT_NAMING_TEMPLATE.format(project_id=project_id),
)
exported_files.append(project_file)
Expand Down
74 changes: 56 additions & 18 deletions backend/src/app/core/data/import_/import_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

from app.core.data.crud.code import crud_code
from app.core.data.crud.document_tag import crud_document_tag
from app.core.data.crud.preprocessing_job import crud_prepro_job
from app.core.data.crud.project import crud_project
from app.core.data.crud.project_metadata import crud_project_meta
from app.core.data.crud.user import SYSTEM_USER_ID, crud_user
from app.core.data.doc_type import (
DocType,
get_doc_type,
Expand All @@ -32,6 +34,7 @@
ImportJobType,
ImportJobUpdate,
)
from app.core.data.dto.preprocessing_job import PreprocessingJobUpdate
from app.core.data.dto.preprocessing_job_payload import (
PreprocessingJobPayloadCreateWithoutPreproJobId,
)
Expand Down Expand Up @@ -524,6 +527,23 @@ def __import_tags_to_proj(
logger.info(f"Generated tag id mapping {tag_id_mapping}")
return tag_id_mapping

def __get_user_ids_for_emails_and_link_to_project(
self,
db: Session,
df: pd.DataFrame,
proj_id: int,
) -> Dict[str, int]:
email_id_mapping: Dict[str, int] = dict()
for _, row in df.iterrows():
user_orm = crud_user.read_by_email_if_exists(db=db, email=row["email"])
if user_orm:
email_id_mapping[row["email"]] = user_orm.id
if user_orm.id != SYSTEM_USER_ID:
crud_project.associate_user(
db=db, proj_id=proj_id, user_id=user_orm.id
)
return email_id_mapping

def _import_project(
self,
db: Session,
Expand All @@ -547,6 +567,7 @@ def _import_project(
"codes": project_codes.csv
"sdoc_links": project_sdoc_links.csv
"tags": project_tags.csv
"users": project_users.csv
}
// das abrauchst du intern aufjeden fall
sdoc_filepaths = {
Expand Down Expand Up @@ -582,6 +603,14 @@ def _import_project(
for _, row in metadata_mapping_df.iterrows():
self.__import_project_metadata(row, db=db, proj_id=proj_id)

# import all users (link existing users to the new project)
user_data_df = pd.read_csv(expected_file_paths["users"])
user_email_id_mapping = self.__get_user_ids_for_emails_and_link_to_project(
db=db,
df=user_data_df,
proj_id=proj_id,
)

# import codes
codes_df = pd.read_csv(expected_file_paths["codes"])
self.__import_codes_to_proj(db=db, df=codes_df, project_id=proj_id)
Expand Down Expand Up @@ -668,15 +697,17 @@ def _import_project(
# create AutoSpans for NER
annotations_for_sdoc: set[AutoSpan] = set()
for _, row in sdoc_annotations_df.iterrows():
auto = AutoSpan(
code=row["code_name"],
start=row["text_begin_char"],
end=row["text_end_char"],
text=row["text"],
start_token=row["text_begin_token"],
end_token=row["text_end_token"],
)
annotations_for_sdoc.add(auto)
if row["user_email"] in user_email_id_mapping:
auto = AutoSpan(
code=row["code_name"],
start=row["text_begin_char"],
end=row["text_end_char"],
text=row["text"],
start_token=row["text_begin_token"],
end_token=row["text_end_token"],
user_id=user_email_id_mapping[row["user_email"]],
)
annotations_for_sdoc.add(auto)
annotations.append(annotations_for_sdoc)
logger.info(f"Generate sdoc annotations {annotations_for_sdoc}")

Expand All @@ -690,6 +721,7 @@ def _import_project(
y_min=row["bbox_y_min"],
x_max=row["bbox_x_max"],
y_max=row["bbox_x_max"],
user_id=user_email_id_mapping[row["user_email"]],
)
bboxes_for_sdoc.add(bbox)
bboxes.append(bboxes_for_sdoc)
Expand Down Expand Up @@ -730,23 +762,28 @@ def _import_project(

# 4. init text piplines
from app.celery.background_jobs.tasks import (
execute_image_preprocessing_pipeline_task,
execute_text_preprocessing_pipeline_task,
)

text_tasks = [
tasks = [
execute_text_preprocessing_pipeline_task.s(cargo, is_init=False)
for cargo in cargos[DocType.text]
]

# 5. init image pipelines
# image_tasks = [
# execute_image_preprocessing_pipeline_task.s(cargo, is_init=False)
# for cargo in cargos[DocType.image]
# ]

# combine all tasks in one group
tasks = text_tasks
gr = group(tasks)()
image_tasks = [
execute_image_preprocessing_pipeline_task.s(cargo, is_init=False)
for cargo in cargos[DocType.image]
]
tasks.extend(image_tasks)
crud_prepro_job.update(
db=db,
uuid=ppj.id,
update_dto=PreprocessingJobUpdate(status=BackgroundJobStatus.RUNNING),
)
logger.info(f"Starting {len(tasks)} tasks on ppj {ppj.id}")
gr = group(*tasks)()
logger.info(f"-------------{gr}")

except Exception as e:
Expand Down Expand Up @@ -822,6 +859,7 @@ def __read_import_project_files(self, temp_proj_path: Path) -> Tuple[Dict, Dict]
"codes": project_codes.csv
"sdoc_links": project_sdoc_links.csv
"tags": project_tags.csv
"users": users.csv
}
sdocs = {
"sdoc_filename":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ class AutoBBox(BaseModel, frozen=True):
y_min: int
x_max: int
y_max: int
user_id: int
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Set
from typing import Dict, Set

from pydantic import Field

Expand All @@ -7,4 +7,4 @@


class PreProImageDoc(PreProDocBase):
bboxes: Set[AutoBBox] = Field(default_factory=list)
bboxes: Dict[str, Set[AutoBBox]] = Field(default_factory=dict)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Union

Expand Down Expand Up @@ -29,7 +30,7 @@ class PreProDocBase(BaseModel):
)
)

metadata: Dict[str, Union[str, List[str]]] = Field(
metadata: Dict[str, Union[str, List[str], bool, int, datetime]] = Field(
description=(
"A container to store all metadata generated during the preprocessing "
"that will be persisted in the database."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ class AutoSpan(BaseModel, frozen=True):
end: int
start_token: int
end_token: int
user_id: int
Loading

0 comments on commit 4b4170a

Please sign in to comment.