Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test three containers in Github Actions #101

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@ services:
env_file: .env
networks:
- pdf_metadata_extraction_network
pdf_metadata_extraction_queue_processor:
container_name: pdf_metadata_extraction_queue_processor
init: true
entrypoint: [ "python3", "-m", "src.start_queue_processor" ]
restart: unless-stopped
build:
context: .
dockerfile: Dockerfile
volumes:
- data:/app/models_data
depends_on:
- mongo_metadata_extraction
- redis_metadata_extraction
environment:
- ENVIRONMENT=${ENVIRONMENT:-development}
- SENTRY_DSN=${SENTRY_DSN:-}
networks:
- pdf_metadata_extraction_network
env_file: .env
pdf_metadata_extraction_worker:
container_name: pdf_metadata_extraction_worker
init: true
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ pymongo==4.6.3
sentry-sdk==2.8.0
redis==5.0.7
requests==2.32.3
git+https://github.com/huridocs/queue-processor@2a961d0f3e579a63a439da058a023d04973449b2
git+https://github.com/huridocs/trainable-entity-extractor@944f843b2171e100de063dc63e1e34d726e7bf3d
git+https://github.com/huridocs/ml-cloud-connector.git@c652ec05b58bb3cdd6303f04729ed4bf57e59fc4
git+https://github.com/huridocs/queue-processor@cc30c4b257e1d517f353d6c65074aa5d8c908270
git+https://github.com/huridocs/trainable-entity-extractor@0911e2d4c5978db34d938a9983180bca0b26040b
159 changes: 90 additions & 69 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from os.path import join

import pymongo
from catch_exceptions import catch_exceptions
from fastapi import FastAPI, HTTPException, UploadFile, File
import sys

from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
import sentry_sdk
from starlette.concurrency import run_in_threadpool
from trainable_entity_extractor.XmlFile import XmlFile
from trainable_entity_extractor.config import config_logger
from trainable_entity_extractor.data.ExtractionIdentifier import ExtractionIdentifier
Expand All @@ -18,8 +20,11 @@
from trainable_entity_extractor.data.Suggestion import Suggestion
from trainable_entity_extractor.send_logs import send_logs

from Extractor import Extractor
from config import MONGO_HOST, MONGO_PORT, DATA_PATH
from data.ExtractionTask import ExtractionTask
from data.Options import Options
from data.Params import Params


@asynccontextmanager
Expand Down Expand Up @@ -57,101 +62,117 @@ async def error():


@app.post("/xml_to_train/{tenant}/{extraction_id}")
@catch_exceptions
async def to_train_xml_file(tenant, extraction_id, file: UploadFile = File(...)):
filename = '"No file name! Probably an error about the file in the request"'
try:
filename = file.filename
xml_file = XmlFile(
extraction_identifier=ExtractionIdentifier(
run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH
),
to_train=True,
xml_file_name=filename,
)
xml_file.save(file=file.file.read())
return "xml_to_train saved"
except Exception:
config_logger.error(f"Error adding task {filename}", exc_info=1)
raise HTTPException(status_code=422, detail=f"Error adding task {filename}")
filename = file.filename
xml_file = XmlFile(
extraction_identifier=ExtractionIdentifier(run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH),
to_train=True,
xml_file_name=filename,
)
xml_file.save(file=file.file.read())
return "xml_to_train saved"


@app.post("/xml_to_predict/{tenant}/{extraction_id}")
@catch_exceptions
async def to_predict_xml_file(tenant, extraction_id, file: UploadFile = File(...)):
filename = '"No file name! Probably an error about the file in the request"'
try:
filename = file.filename
xml_file = XmlFile(
extraction_identifier=ExtractionIdentifier(
run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH
),
to_train=False,
xml_file_name=filename,
)
xml_file.save(file=file.file.read())
return "xml_to_train saved"
except Exception:
config_logger.error(f"Error adding task {filename}", exc_info=1)
raise HTTPException(status_code=422, detail=f"Error adding task {filename}")
filename = file.filename
xml_file = XmlFile(
extraction_identifier=ExtractionIdentifier(run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH),
to_train=False,
xml_file_name=filename,
)
xml_file.save(file=file.file.read())
return "xml_to_train saved"


@app.post("/labeled_data")
@catch_exceptions
async def labeled_data_post(labeled_data: LabeledData):
try:
pdf_metadata_extraction_db = app.mongodb_client["pdf_metadata_extraction"]
pdf_metadata_extraction_db.labeled_data.insert_one(labeled_data.scale_down_labels().to_dict())
return "labeled data saved"
except Exception:
config_logger.error("Error", exc_info=1)
raise HTTPException(status_code=422, detail="An error has occurred. Check graylog for more info")
pdf_metadata_extraction_db = app.mongodb_client["pdf_metadata_extraction"]
pdf_metadata_extraction_db.labeled_data.delete_many(
{"tenant": labeled_data.tenant, "id": labeled_data.id, "xml_file_name": labeled_data.xml_file_name}
)
pdf_metadata_extraction_db.labeled_data.insert_one(labeled_data.scale_down_labels().to_dict())
return "labeled data saved"


@app.post("/prediction_data")
@catch_exceptions
async def prediction_data_post(prediction_data: PredictionData):
try:
pdf_metadata_extraction_db = app.mongodb_client["pdf_metadata_extraction"]
pdf_metadata_extraction_db.prediction_data.insert_one(prediction_data.to_dict())
return "prediction data saved"
except Exception:
config_logger.error("Error", exc_info=1)
raise HTTPException(status_code=422, detail="An error has occurred. Check graylog for more info")
pdf_metadata_extraction_db = app.mongodb_client["pdf_metadata_extraction"]
pdf_metadata_extraction_db.labeled_data.delete_many(
{"tenant": prediction_data.tenant, "id": prediction_data.id, "xml_file_name": prediction_data.xml_file_name}
)
pdf_metadata_extraction_db.prediction_data.insert_one(prediction_data.to_dict())
return "prediction data saved"


@app.get("/get_suggestions/{tenant}/{extraction_id}")
@catch_exceptions
async def get_suggestions(tenant: str, extraction_id: str):
try:
pdf_metadata_extraction_db = app.mongodb_client["pdf_metadata_extraction"]
suggestions_filter = {"tenant": tenant, "id": extraction_id}
suggestions_list: list[str] = list()
pdf_metadata_extraction_db = app.mongodb_client["pdf_metadata_extraction"]
suggestions_filter = {"tenant": tenant, "id": extraction_id}
suggestions_list: list[str] = list()

for document in pdf_metadata_extraction_db.suggestions.find(suggestions_filter):
suggestions_list.append(Suggestion(**document).scale_up().to_output())
for document in pdf_metadata_extraction_db.suggestions.find(suggestions_filter):
suggestions_list.append(Suggestion(**document).scale_up().to_output())

pdf_metadata_extraction_db.suggestions.delete_many(suggestions_filter)
extraction_identifier = ExtractionIdentifier(run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH)
send_logs(extraction_identifier, f"{len(suggestions_list)} suggestions queried")
pdf_metadata_extraction_db.suggestions.delete_many(suggestions_filter)
extraction_identifier = ExtractionIdentifier(run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH)
send_logs(extraction_identifier, f"{len(suggestions_list)} suggestions queried")

return json.dumps(suggestions_list)
except Exception:
config_logger.error("Error", exc_info=1)
raise HTTPException(status_code=422, detail="An error has occurred. Check graylog for more info")
return json.dumps(suggestions_list)


@app.delete("/{tenant}/{extraction_id}")
async def get_suggestions(tenant: str, extraction_id: str):
async def delete_model(tenant: str, extraction_id: str):
shutil.rmtree(join(DATA_PATH, tenant, extraction_id), ignore_errors=True)
return True


@app.get("/get_status/{tenant}/{extraction_id}")
async def get_satus(tenant: str, extraction_id: str):
extraction_identifier = ExtractionIdentifier(run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH)
return extraction_identifier.get_status()


@app.post("/train/{tenant}/{extraction_id}")
@catch_exceptions
async def train(tenant: str, extraction_id: str):
extraction_identifier = ExtractionIdentifier(run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH)
params = Params(
id=extraction_id, options=extraction_identifier.get_options(), multi_value=extraction_identifier.get_multi_value()
)
task = ExtractionTask(tenant=tenant, task=Extractor.CREATE_MODEL_TASK_NAME, params=params)
run_in_threadpool(Extractor.calculate_task, task)
return True


@app.post("/predict/{tenant}/{extraction_id}")
@catch_exceptions
async def predict(tenant: str, extraction_id: str):
extraction_identifier = ExtractionIdentifier(run_name=tenant, extraction_name=extraction_id, output_path=DATA_PATH)
params = Params(
id=extraction_id, options=extraction_identifier.get_options(), multi_value=extraction_identifier.get_multi_value()
)
task = ExtractionTask(tenant=tenant, task=Extractor.SUGGESTIONS_TASK_NAME, params=params)
run_in_threadpool(Extractor.calculate_task, task)
return True


@app.post("/options")
@catch_exceptions
def save_options(options: Options):
try:
extraction_identifier = ExtractionIdentifier(
run_name=options.tenant, extraction_name=options.extraction_id, output_path=DATA_PATH
)
extraction_identifier.save_options(options.options)
os.utime(extraction_identifier.get_options_path().parent)
config_logger.info(f"Options {options.options[:150]} saved for {extraction_identifier}")
return True
except Exception:
config_logger.error("Error", exc_info=1)
raise HTTPException(status_code=422, detail="An error has occurred. Check graylog for more info")
extraction_identifier = ExtractionIdentifier(
run_name=options.tenant, extraction_name=options.extraction_id, output_path=DATA_PATH
)
extraction_identifier.save_options(options.options)

if options.multi_value is not None:
extraction_identifier.save_multi_value(options.multi_value)

os.utime(extraction_identifier.get_options_path().parent)
config_logger.info(f"Options {options.options[:150]} saved for {extraction_identifier}")
return True
22 changes: 22 additions & 0 deletions src/catch_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from functools import wraps
from fastapi import HTTPException

from configuration import service_logger


def catch_exceptions(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="No xml file")
except Exception:
try:
if kwargs and "file" in kwargs:
service_logger.info(f"Error adding task {kwargs['file'].filename}")
except Exception:
service_logger.error("Error see traceback", exc_info=1)
raise HTTPException(status_code=422, detail="Error see traceback")

return wrapper
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

SERVICE_HOST = os.environ.get("SERVICE_HOST", "http://127.0.0.1")
SERVICE_PORT = os.environ.get("SERVICE_PORT", "5056")
METADATA_EXTRACTOR_PORT = os.environ.get("METADATA_EXTRACTOR_PORT", "5066")
REDIS_HOST = os.environ.get("REDIS_HOST", "127.0.0.1")
REDIS_PORT = os.environ.get("REDIS_PORT", "6379")
MONGO_HOST = os.environ.get("MONGO_HOST", "mongodb://127.0.0.1")
Expand Down
1 change: 1 addition & 0 deletions src/data/Options.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ class Options(BaseModel):
tenant: str
extraction_id: str
options: list[Option]
multi_value: bool | None = None
49 changes: 45 additions & 4 deletions src/start_queue_processor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import os

import pymongo
import requests
import torch
from configuration import service_logger
from ml_cloud_connector.MlCloudConnector import MlCloudConnector
from ml_cloud_connector.ServerType import ServerType
from pydantic import ValidationError
from queue_processor.QueueProcessor import QueueProcessor
from sentry_sdk.integrations.redis import RedisIntegration
import sentry_sdk
from trainable_entity_extractor.config import config_logger
from trainable_entity_extractor.data.ExtractionIdentifier import ExtractionIdentifier
from trainable_entity_extractor.data.ExtractionStatus import ExtractionStatus
from trainable_entity_extractor.send_logs import send_logs

from config import (
Expand All @@ -14,7 +21,7 @@
REDIS_HOST,
REDIS_PORT,
QUEUES_NAMES,
DATA_PATH,
DATA_PATH, METADATA_EXTRACTOR_PORT, MONGO_HOST, MONGO_PORT,
)
from data.ExtractionTask import ExtractionTask
from data.ResultsMessage import ResultsMessage
Expand All @@ -24,16 +31,50 @@
def restart_condition(message: dict[str, any]) -> bool:
return ExtractionTask(**message).task == Extractor.CREATE_MODEL_TASK_NAME

def calculate_task(extraction_task: ExtractionTask) -> (bool, str):
extractor_identifier = ExtractionIdentifier(
run_name=extraction_task.tenant,
extraction_name=extraction_task.params.id,
metadata=extraction_task.params.metadata,
output_path=DATA_PATH,
)

Extractor.remove_old_models(extractor_identifier)

if extraction_task.task == Extractor.CREATE_MODEL_TASK_NAME:
return Extractor.create_model(extractor_identifier, extraction_task.params)
elif extraction_task.task == Extractor.SUGGESTIONS_TASK_NAME:
return Extractor.create_suggestions(extractor_identifier, extraction_task.params)
else:
return False, f"Task {extraction_task.task} not recognized"


def process(message: dict[str, any]) -> dict[str, any] | None:
def should_wait(task):
mongo_client = pymongo.MongoClient(f"{MONGO_HOST}:{MONGO_PORT}")
ml_cloud_connector = MlCloudConnector(ServerType.METADATA_EXTRACTOR, service_logger)
ip = ml_cloud_connector.get_ip()
status = requests.get(f"http://{ip}:{METADATA_EXTRACTOR_PORT}/get_status/{task.tenant}/{task.params.id}")
if status.status_code != 200:
return True

if ExtractionStatus(int(status.json())) == ExtractionStatus.PROCESSING:
return True

return False


def process_messages(message: dict[str, any]) -> dict[str, any] | None:
try:
task = ExtractionTask(**message)
config_logger.info(f"New task {message}")
except ValidationError:
config_logger.error(f"Not a valid Redis message: {message}")
return None

task_calculated, error_message = Extractor.calculate_task(task)
if should_wait(task):
return None

task_calculated, error_message = calculate_task(task)

if task_calculated:
data_url = None
Expand Down Expand Up @@ -92,4 +133,4 @@ def task_to_string(extraction_task: ExtractionTask):
config_logger.info(f"Waiting for messages. Is GPU used? {torch.cuda.is_available()}")
queues_names = QUEUES_NAMES.split(" ")
queue_processor = QueueProcessor(REDIS_HOST, REDIS_PORT, queues_names, config_logger)
queue_processor.start(process, restart_condition)
queue_processor.start(process_messages, restart_condition)