diff --git a/.env b/.env index 6c3520a1..292a1041 100644 --- a/.env +++ b/.env @@ -3,7 +3,7 @@ # # traefik variables -TRAEFIK_DASHBOARD_PORT=8081 +TRAEFIK_DASHBOARD_PORT=8082 TRAEFIK_HTTP_PORT=80 TRAEFIK_HTTPS_PORT=443 diff --git a/clustering/src/ProcessingResource.py b/clustering/src/ProcessingResource.py index e50de833..adfbcb45 100644 --- a/clustering/src/ProcessingResource.py +++ b/clustering/src/ProcessingResource.py @@ -9,11 +9,13 @@ import json import os import requests +import numpy as np class ProcessingResource: __logger = getLogger(__name__) __clustering: Clustering = Clustering() + vocab_len = 42024 def __default(self, o) -> int : if isinstance(o, int64): return int(o) @@ -31,12 +33,18 @@ def processTask(self, data): raise requireTwoEmbeddings embeddings: List[Embedding] = list(map(lambda dict: Embedding.from_dict(dict), data['embeddings'])) + if len(embeddings) < 2: self.__logger.error("{} ({})".format(requireTwoEmbeddings.title, requireTwoEmbeddings.description)) raise requireTwoEmbeddings self.__logger.info("Computing clusters of {} embeddings.".format(len(embeddings))) vectors: List[ElmoVector] = list(map(lambda e: e.vector, embeddings)) + + if data["multilingual"]: + vectors = list(map(lambda v: self.one_hot_encode(v), vectors)) + embeddings: List[Embedding] = list(map(lambda e : Embedding.from_wmt_embedding(e.id, self.one_hot_encode(e.vector)), embeddings)) + labels, probabilities = self.__clustering.cluster(vectors) clusterLabels: List[int] = list(map(lambda i: int(i), set(labels))) @@ -125,7 +133,7 @@ def getNewTask(self): headers = { "Authorization": auth_secret } - task = requests.get(get_task_url, json={"taskType": "clustering"}, headers=headers, timeout=30) + task = requests.get(get_task_url, json={"taskType": "clustering"}, headers=headers, timeout=300) except Exception as e: self.__logger.error("getTask-API seems to be down: {}".format(str(e))) return None @@ -138,3 +146,9 @@ def getNewTask(self): except Exception as e: self.__logger.error("Exception while parsing json: {}".format(str(e))) return None + + def one_hot_encode(self, vector): + encoded_vector = np.zeros(self.vocab_len) + for count, i in enumerate(vector): + encoded_vector[int(i)] = 1 if i < self.vocab_len else 0 + return encoded_vector diff --git a/clustering/src/entities.py b/clustering/src/entities.py index 2455c707..d5948dc3 100644 --- a/clustering/src/entities.py +++ b/clustering/src/entities.py @@ -17,6 +17,7 @@ def __init__(self, id: str, text: Sentence = None): def from_dict(cls, dict: dict) -> 'TextBlock': return cls(dict['id'], dict['text']) + class Embedding: id: str vector: ElmoVector @@ -28,3 +29,7 @@ def __init__(self, id: str, vector: ElmoVector): @classmethod def from_dict(cls, dict: dict) -> 'Embedding': return cls(dict['id'], dict['vector']) + + @classmethod + def from_wmt_embedding(cls, id, vector): + return cls(id, vector) diff --git a/data/db/diagnostic.data/metrics.interim b/data/db/diagnostic.data/metrics.interim new file mode 100644 index 00000000..0851caaf Binary files /dev/null and b/data/db/diagnostic.data/metrics.interim differ diff --git a/docker-compose.override.yml b/docker-compose.override.yml index 7ab7c07f..ef1d7251 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -4,8 +4,8 @@ version: '3' # By that, rebuilding the containers is not needed for code changes locally as long as no requirements change services: - traefik: - # This component does not need to be changed + #traefik: + #This component does not need to be changed load-balancer: volumes: @@ -19,9 +19,17 @@ services: embedding: volumes: - - ./embedding/src:/usr/src/app/src - - ./text_preprocessing:/usr/src/app/src/text_preprocessing - - ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro + #- ./embedding/src:/usr/src/app/src + #- ./text_preprocessing:/usr/src/app/src/text_preprocessing + #- ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro + # This will keep the subdirectory as it is in the container + - /usr/src/app/src/resources + + embedding-wmt: + volumes: + - ./embedding_wmt/src:/usr/src/app/src + #- ./text_preprocessing:/usr/src/app/src/text_preprocessing + #- ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro # This will keep the subdirectory as it is in the container - /usr/src/app/src/resources @@ -33,5 +41,5 @@ services: volumes: - ./tracking/src:/usr/src/app/src - database: - # This component does not need to be changed + #database: + # This component does not need to be changed \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 5200747c..70d009f3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,7 +4,7 @@ services: # http://localhost:8081/api # http://localhost:8081/dashboard traefik: - image: traefik:v2.2.1 + image: "traefik:v2.2.1" container_name: athene-traefik restart: unless-stopped depends_on: @@ -83,7 +83,8 @@ services: build: context: . dockerfile: ./embedding/Dockerfile - image: athene-embedding + image: "ghcr.io/ls1intum/athene/athene-embedding:94b958a" + #image: embedding_wmt container_name: athene-embedding restart: unless-stopped depends_on: @@ -105,8 +106,8 @@ services: - CHUNK_SIZE=${EMBEDDING_CHUNK_SIZE} - BALANCER_SENDRESULT_URL=${BALANCER_SENDRESULT_URL} working_dir: /usr/src/app - volumes: - - ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro + #volumes: + # - ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro networks: - athene labels: @@ -117,6 +118,40 @@ services: - traefik.http.routers.upload-tls.entrypoints=websecure - traefik.http.routers.upload-tls.tls=true + embedding-wmt: + build: + context: . + dockerfile: ./embedding_wmt/Dockerfile + #image: "ghcr.io/ls1intum/athene/athene-embedding:94b958a" + image: embedding_wmt + container_name: athene-embedding-wmt + restart: unless-stopped + depends_on: + - load-balancer + - database +# ports: +# - 8002:8000 + expose: + - 8000 + environment: + - DATABASE_HOST=${DATABASE_HOST} + - DATABASE_PORT=${DATABASE_PORT} + - DATABASE_NAME=${DATABASE_NAME} + - DATABASE_USER=${EMBEDDING_DATABASE_USER} + - DATABASE_PWD=${EMBEDDING_DATABASE_PWD} + - BALANCER_AUTHORIZATION_SECRET=${LOAD_BALANCER_AUTHORIZATION_SECRET} + - BALANCER_QUEUE_FREQUENCY=${BALANCER_QUEUE_FREQUENCY} + - BALANCER_GETTASK_URL=${BALANCER_GETTASK_URL} + - CHUNK_SIZE=${EMBEDDING_CHUNK_SIZE} + - BALANCER_SENDRESULT_URL=${BALANCER_SENDRESULT_URL} + working_dir: /usr/src/app + #volumes: + # - ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro + networks: + - athene + labels: + - traefik.enable=true + clustering: build: context: . diff --git a/embedding/Makefile b/embedding/Makefile index b94995f6..015c5d28 100644 --- a/embedding/Makefile +++ b/embedding/Makefile @@ -1,2 +1,4 @@ +# Downlaad ELMo model from AllenNLP +# https://allennlp.org/elmo deps: @$(MAKE) -C src/resources/models; diff --git a/embedding/src/resources/models/Makefile b/embedding/src/resources/models/Makefile index bea08fc5..2e07f16d 100644 --- a/embedding/src/resources/models/Makefile +++ b/embedding/src/resources/models/Makefile @@ -1,4 +1,3 @@ -# Downlaad ELMo model from AllenNLP # https://allennlp.org/elmo deps: elmo_2x4096_512_2048cnn_2xhighway_5.5B diff --git a/embedding_wmt/Dockerfile b/embedding_wmt/Dockerfile new file mode 100644 index 00000000..212123ec --- /dev/null +++ b/embedding_wmt/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.9 +LABEL author="Jan Philip Bernius " + +COPY ./embedding_wmt/requirements.txt /tmp/requirements.txt +RUN pip install --no-cache-dir -qr /tmp/requirements.txt + + +WORKDIR /usr/src/app +COPY ./embedding_wmt/src/resources src/resources + +RUN apt-get -y update +RUN apt-get -y install git +RUN apt-get -y install git-lfs + +RUN git clone https://huggingface.co/facebook/wmt19-de-en src/resources/models + + +# Run the image as a non-root user +RUN groupadd -r textemb && useradd --no-log-init -r -g textemb textemb +RUN mkdir -p /usr/src/app/logs/ && chown -R textemb:textemb /usr/src/app/ +VOLUME ["/usr/src/app/logs"] +USER textemb + +EXPOSE 8000 +CMD ["uvicorn", "--host", "0.0.0.0", "--port", "8000", "src.main:app"] diff --git a/embedding_wmt/Makefile b/embedding_wmt/Makefile new file mode 100644 index 00000000..f1121cf7 --- /dev/null +++ b/embedding_wmt/Makefile @@ -0,0 +1,2 @@ +deps: + @$(MAKE) -C src/resources; diff --git a/embedding_wmt/README.md b/embedding_wmt/README.md new file mode 100644 index 00000000..d53ac472 --- /dev/null +++ b/embedding_wmt/README.md @@ -0,0 +1,48 @@ +Service runs on port 8002 + +```bash +uvicorn --port 8002 --host 0.0.0.0 --reload src.main:app +``` + +Note: In Artemis 8080 is written in configs + +Configurable environment variables: + +- BALANCER\_QUEUE\_FREQUENCY +- BALANCER\_GETTASK\_URL +- CHUNK\_SIZE +- BALANCER\_SENDRESULT\_URL + +the following API-routes will be available after start: +[http://localhost:8002/trigger](http://localhost:8002/trigger) + + +Input example JSON for POST on http://localhost:8001/embed + +```json +{ + "courseId" : "1234", + "blocks": [ + { + "id" : 1, + "text" : "This is the first text block." + }, + { + "id" : 2, + "text" : "and this is the second one" + } + ] + } +``` + + +Input example JSON for POST on http://localhost:8001/upload + +```json +{ + "courseId" : "1234", + "fileName" : "file.pdf", + "fileData" : "a21s5d5sqa354a34575" + } +``` + diff --git a/embedding_wmt/requirements.txt b/embedding_wmt/requirements.txt new file mode 100644 index 00000000..966be7a7 --- /dev/null +++ b/embedding_wmt/requirements.txt @@ -0,0 +1,12 @@ +# Listed dependencies as of 2021-02-16 +transformers==4.16.1 +docutils==0.18.1 +fastapi==0.73.0 +joblib==1.1.0 +numpy==1.22.1 +requests==2.27.1 +scikit-learn==1.0.2 +scipy==1.7.3 +uvicorn==0.17.1 +torch==1.8.0 +sentencepiece==0.1.96 \ No newline at end of file diff --git a/embedding_wmt/src/ProcessingResource.py b/embedding_wmt/src/ProcessingResource.py new file mode 100644 index 00000000..33b53fdb --- /dev/null +++ b/embedding_wmt/src/ProcessingResource.py @@ -0,0 +1,104 @@ +import json +import os +import requests +from typing import List +from logging import getLogger +from datetime import datetime +from numpy import ndarray + +from .entities import Sentence, TextBlock, Embedding +from .errors import requireTwoBlocks +from .wmt import WMT + + +class ProcessingResource: + __logger = getLogger(__name__) + + + def __default(self, o) -> int: + if isinstance(o, Embedding): return o.__dict__ + if isinstance(o, ndarray): return o.tolist() + raise TypeError + + # Starts processing of a queried task + def processTask(self, data): + self.__logger.debug("-" * 80) + self.__logger.info("Start processing Embedding Request:") + + if "blocks" not in data: + self.__logger.error("{} ({})".format(requireTwoBlocks.title, requireTwoBlocks.description)) + raise requireTwoBlocks + + self.__wmt = WMT() + + blocks: List[TextBlock] = list(map(lambda dict: TextBlock.from_dict(dict), data['blocks'])) + + sentences: List[Sentence] = list(map(lambda b: b.text, blocks)) + + if len(blocks) < 2: + self.__logger.error("{} ({})".format(requireTwoBlocks.title, requireTwoBlocks.description)) + raise requireTwoBlocks + + self.__logger.info("Computing embeddings of {} blocks.".format(len(blocks))) + + vectors: List[List[int]] = self.__wmt.embed_sentences(sentences) + embeddings: List[Embedding] = [Embedding(block.id, vectors[i]) for i, block in enumerate(blocks)] + + output = { + 'embeddings': embeddings + } + + self.__logger.info("Hier sind die Embeddings") + self.__logger.info(embeddings) + + #Embedding ist fertig, es wird zurück an x geschickt + self.__logger.info("Completed Embedding Request.") + self.__logger.debug("-" * 80) + + output["jobId"] = data["jobId"] + output["taskId"] = data["taskId"] + #type anpassen + output["resultType"] = "embedding_wmt" + + try: + self.__logger.info("Writing logfile") + with open("logs/embedding-{}.json".format(datetime.now()), 'w') as outfile: + json.dump(output, outfile, ensure_ascii=False, default=self.__default) + except Exception as e: + self.__logger.error("Error while writing logfile: {}".format(str(e))) + + self.__logger.info("Send back embedding-results") + # Get container variable for load balancer url + send_result_url = str(os.environ['BALANCER_SENDRESULT_URL']) if "BALANCER_SENDRESULT_URL" in os.environ else "http://localhost:8000/sendTaskResult" + auth_secret = str(os.environ['BALANCER_AUTHORIZATION_SECRET']) if "BALANCER_AUTHORIZATION_SECRET" in os.environ else "" + headers = { + "Authorization": auth_secret + } + response = requests.post(send_result_url, data=json.dumps(output, default=self.__default), headers=headers, timeout=30) + if response.status_code != 200: + self.__logger.error("Sending back failed: {}".format(response.text)) + + # Queries the taskQueue and returns the task data (json) + def getNewTask(self): + try: + # Get container variable for load balancer url + get_task_url = str(os.environ['BALANCER_GETTASK_URL']) if "BALANCER_GETTASK_URL" in os.environ else "http://localhost:8000/getTask" + chunk_size = int(os.environ['CHUNK_SIZE']) if "CHUNK_SIZE" in os.environ else 50 + auth_secret = str(os.environ['BALANCER_AUTHORIZATION_SECRET']) if "BALANCER_AUTHORIZATION_SECRET" in os.environ else "" + headers = { + "Authorization": auth_secret + } + #hier embedding_wmt rein + task = requests.get(get_task_url, json={"taskType": "embedding", "chunkSize": chunk_size}, headers=headers, timeout=60) + except Exception as e: + self.__logger.error("getTask-API seems to be down: {}".format(str(e))) + return None + + if task.status_code != 200: + return None + + try: + return task.json() + except Exception as e: + self.__logger.error("Exception while parsing json: {}".format(str(e))) + return None diff --git a/embedding_wmt/src/TimerHandler.py b/embedding_wmt/src/TimerHandler.py new file mode 100644 index 00000000..e8ae5542 --- /dev/null +++ b/embedding_wmt/src/TimerHandler.py @@ -0,0 +1,97 @@ +from logging import getLogger +from src.ProcessingResource import ProcessingResource +import os +import threading + +process_lock = threading.Lock() # Lock to prevent multiple calculations and restart during calculation +# Interval to query task queue (in seconds) +try: +# timer_frequency = int(os.environ['BALANCER_QUEUE_FREQUENCY']) if "BALANCER_QUEUE_FREQUENCY" in os.environ else 600 + timer_frequency = 600 +except Exception: +# timer_frequency = 600 + timer_frequency = 600 +timer_thread: threading.Thread # Object holding the Timer-Thread +timer_lock = threading.Lock() # Lock to prevent multiple invocations of restarting the Timer-Thread at once + + +class TimerThread(threading.Thread): + __logger = getLogger(__name__) + + def __init__(self, sleep_interval=1): + super().__init__() + self._kill = threading.Event() + self._interval = sleep_interval + + + def run(self): + self.__logger.info("Timer-Thread started (Frequency " + str(self._interval) + "s)") + processor = ProcessingResource() + + while True: + try: + # Prevent being blocked by restartTimerThread() function + if process_lock.acquire(timeout=5): + self.__logger.info("Querying task queue") + task = processor.getNewTask() + if task is None: + self.__logger.info("No new embedding task available") + process_lock.release() + # Query task queue after timeout again + is_killed = self._kill.wait(self._interval) + else: + self.__logger.info("Process new embedding task") + processor.processTask(task) + process_lock.release() + # Query task queue again without waiting after processing + is_killed = self._kill.wait(0) + else: + self.__logger.error("Could not acquire process_lock") + # Query task queue after timeout again + is_killed = self._kill.wait(self._interval) + except Exception as e: + self.__logger.error("Exception while processing: {}".format(str(e))) + process_lock.release() + # Query task queue after timeout again + is_killed = self._kill.wait(self._interval) + + # If no kill signal is set, sleep for the interval, + # If kill signal comes in while sleeping, immediately + # wake up and handle + #is_killed = self._kill.wait(self._interval) + if is_killed: + break + + self.__logger.info("Timer-Thread stopped") + + def stop(self): + self._kill.set() + + +class TimerHandler: + __logger = getLogger(__name__) + + def startTimerThread(self): + global timer_thread + timer_thread = TimerThread(sleep_interval=timer_frequency) + timer_thread.daemon = True # Thread gets killed if main thread exits + timer_thread.start() + + def stopTimerThread(self): + global timer_thread + timer_thread.stop() + timer_thread.join() + + def restartTimerThread(self): + # Prevent unnecessary restarts + if process_lock.acquire(False): + if timer_lock.acquire(False): + self.__logger.info("Restarting timer") + self.stopTimerThread() + self.startTimerThread() + timer_lock.release() + else: + self.__logger.info("Timer restart already requested.") + process_lock.release() + else: + self.__logger.info("Calculation running. No need to restart timer.") \ No newline at end of file diff --git a/embedding_wmt/src/__init__.py b/embedding_wmt/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/embedding_wmt/src/entities.py b/embedding_wmt/src/entities.py new file mode 100644 index 00000000..84774be3 --- /dev/null +++ b/embedding_wmt/src/entities.py @@ -0,0 +1,64 @@ +from typing import List +from numpy import array + +Word = str +Sentence = str +ElmoVector = array + + +class TextBlock: + id: str + text: Sentence + + def __init__(self, id: str, text: Sentence = None): + self.id = id + if text is not None: + self.text = text + + @classmethod + def from_dict(cls, dict: dict) -> 'TextBlock': + return cls(dict['id'], dict['text']) + + +class Embedding: + id: str + vector: List[int] + + def __init__(self, id: str, vector: List[int]): + self.id = id + self.vector = vector + + @classmethod + def from_dict(cls, dict: dict) -> 'Embedding': + return cls(dict['id'], dict['vector']) + +##################################################################################### +class Feedback: + id: str + text: Sentence + score: float + feedbackEmbeddings: [ElmoVector] + + def __init__(self, _id: str, text: Sentence, score: float): + self.id = _id + self.text = text + self.score = score + self.feedbackEmbeddings = [] + + +class FeedbackWithTextBlock: + id: str + cluster_id: str + text: str + text_embedding: ElmoVector + feedback: Feedback + + def __init__(self, _id: str, cluster_id: str, text: str, feedback: Feedback): + self.id = _id + self.cluster_id = cluster_id + self.text = text + self.feedback = feedback + + def add_feedback_embedding(self, embedding: ElmoVector): + self.feedback.feedbackEmbeddings.append(embedding) + diff --git a/embedding_wmt/src/errors.py b/embedding_wmt/src/errors.py new file mode 100644 index 00000000..6edfb2c9 --- /dev/null +++ b/embedding_wmt/src/errors.py @@ -0,0 +1,15 @@ +from fastapi import HTTPException + +invalidJson = HTTPException(status_code=400, detail="Invalid JSON - No valid json provided.") +requireTwoBlocks = HTTPException(status_code=400, + detail="Need two or more blocks - Must provide at least two text blocks.") +requireCourseId = HTTPException(status_code=400, + detail="CourseId not provided - Must provide a course identifier to upload material") +requireFileData = HTTPException(status_code=400, + detail="File data is not provided - Must provide file data to upload material") +requireFileName = HTTPException(status_code=400, + detail="File name is not provided - Must provide file name to upload material") +requireFeedbackWithTextBlock = HTTPException(status_code=400, + detail="Need feedback with text blocks - Must provide feedbackWithTextBlock") +requireExerciseId = HTTPException(status_code=400, + detail="Need exercise id - Must provide exercise id") diff --git a/embedding_wmt/src/main.py b/embedding_wmt/src/main.py new file mode 100644 index 00000000..80a4a72d --- /dev/null +++ b/embedding_wmt/src/main.py @@ -0,0 +1,25 @@ +import sys +import logging +from fastapi import FastAPI, Request, Response, BackgroundTasks +from src.TimerHandler import TimerHandler +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('[%(asctime)s] [%(process)d] [%(levelname)s] [%(name)s] %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + +# Start timer-thread on application startup +timer_handler = TimerHandler() +timer_handler.startTimerThread() + + +app = FastAPI() + +@app.post("/trigger") +async def trigger(request: Request, response: Response, background_tasks: BackgroundTasks): + logger.info("Trigger received") + # Restart Timer-Thread if needed to query for new Task instantly (will not stop a currently running computation) + background_tasks.add_task(timer_handler.restartTimerThread) + return {'Trigger received'} diff --git a/embedding_wmt/src/resources/Makefile b/embedding_wmt/src/resources/Makefile new file mode 100644 index 00000000..5c016fce --- /dev/null +++ b/embedding_wmt/src/resources/Makefile @@ -0,0 +1,15 @@ +# Download WMT dense 24 wide model from facebook +# https://github.com/pytorch/fairseq/tree/main/examples/wmt21 + +#ToDo download wmt model from transformers +models: + git clone https://huggingface.co/facebook/wmt21-dense-24-wide-x-en ./models + +#nicht das richtige model +#wmt21.dense-24-wide.X-En: +# @curl "https://dl.fbaipublicfiles.com/fairseq/models/wmt21.dense-24-wide.X-En.tar.gz" | tar xz + + +#wmt21.dense-24-wide.X-En: +# @curl "https://huggingface.co/facebook/wmt21-dense-24-wide-x-en" + diff --git a/embedding_wmt/src/wmt.py b/embedding_wmt/src/wmt.py new file mode 100644 index 00000000..8d3d15af --- /dev/null +++ b/embedding_wmt/src/wmt.py @@ -0,0 +1,48 @@ +import numpy as np +from logging import getLogger +from pathlib import Path +from typing import List, Tuple +from scipy.spatial import distance +from .entities import Sentence +from transformers import AutoModelForSeq2SeqLM, AutoTokenizer +TwoSentences = Tuple[Sentence, Sentence] + + +class WMT: + + __logger = getLogger(__name__) + __RESOURCE_PATH = (Path.cwd() / "src/resources/models").resolve() + + def __init__(self): + self.tokenizer = AutoTokenizer.from_pretrained(WMT.__RESOURCE_PATH) + self.model = AutoModelForSeq2SeqLM.from_pretrained(WMT.__RESOURCE_PATH) + + + def embed_sentences(self, sentences: List[Sentence]) -> List[List[int]]: + tokenized_sentences: List[List[str]] = list(map(lambda sentence: self.tokenizer(sentence, return_tensors="pt"), sentences)) + generated_tokens: List[List[int]]= list(map(lambda tokenized_sentence: self.model.generate(**tokenized_sentence), tokenized_sentences)) + encoded_tokens: List[List[int]]= list(map(lambda token: self.encode_tensor_as_vector(token), generated_tokens)) + return encoded_tokens + + def encode_two_sentences(self, sentences: TwoSentences): + return self.embed_sentences(list(sentences)) + + + def distance(self, sentences: TwoSentences) -> float: + token_vector_1, token_vector_2 = self.encode_two_sentences(sentences) + return distance.cosine(token_vector_1, token_vector_2) + + def encode_tensor_as_vector(self, tokens_to_encode): + tokens_to_encode = self.cut_special_tokens(tokens_to_encode) + len_tokens = len(tokens_to_encode) + self.__logger.info(len_tokens) + vector = np.zeros(len_tokens) + for i in range(len_tokens): + vector[i] = tokens_to_encode[i].item() + return vector + + + def cut_special_tokens(self, tokens_to_encode): + return tokens_to_encode[0][1:-1] + + diff --git a/load-balance.yml b/load-balance.yml index a4497105..d9f7d9f3 100644 --- a/load-balance.yml +++ b/load-balance.yml @@ -4,6 +4,7 @@ docker_nodes: - traefik_service_api: http://athene-traefik:8080/api/http/services/ segmentation_service_name: segmentation-athene@docker embedding_service_name: embedding-athene@docker + embedding_wmt_service_name: embedding-wmt-athene@docker clustering_service_name: clustering-athene@docker trigger_route: /trigger diff --git a/load-balancer/README.md b/load-balancer/README.md index 1212473c..3a14acb5 100644 --- a/load-balancer/README.md +++ b/load-balancer/README.md @@ -14,7 +14,7 @@ Input example JSON for POST on http://localhost:8000/submit ```json { "courseId": 2, - "callbackUrl": "http://testurl" + "callbackUrl": "http://testurl", "submissions":[ { "id":1, diff --git a/load-balancer/__init__.py b/load-balancer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/load-balancer/requirements.txt b/load-balancer/requirements.txt index 8c2fc9c9..468d5b67 100644 --- a/load-balancer/requirements.txt +++ b/load-balancer/requirements.txt @@ -6,4 +6,4 @@ gunicorn==20.0.4 numpy==1.18.3 PyYAML==5.4 requests==2.24.0 -protobuf==3.15.8 +protobuf==3.15.8 \ No newline at end of file diff --git a/load-balancer/src/ConfigParser.py b/load-balancer/src/ConfigParser.py index 56b42082..6eceaba8 100644 --- a/load-balancer/src/ConfigParser.py +++ b/load-balancer/src/ConfigParser.py @@ -1,3 +1,5 @@ +import logging + from .entities import NodeType, ComputeNode from logging import getLogger import os.path diff --git a/load-balancer/src/JSONHandler.py b/load-balancer/src/JSONHandler.py new file mode 100644 index 00000000..20d2c664 --- /dev/null +++ b/load-balancer/src/JSONHandler.py @@ -0,0 +1,33 @@ +from .errors import invalidJson +from fastapi import Request + +import logging +import os +import sys + +logger = logging.getLogger() + + +class JSONHandler: + + @staticmethod + async def parseJson(request: Request): + try: + return await request.json() + except Exception as e: + logger.error("Exception while parsing json: {}".format(str(e))) + raise invalidJson + + @staticmethod + def writeJsonToFile(job_id: int, filename: str, data): + # Only write file if log-level is DEBUG + if logger.level == logging.DEBUG: + try: + directory = "logs/job_" + str(job_id) + if not os.path.exists(directory): + os.makedirs(directory) + logger.debug("Writing data to logfile: {}".format(filename)) + with open(directory + "/" + filename + ".json", 'w') as outfile: + outfile.write(data) + except Exception as e: + logger.error("Error while writing logfile: {}".format(str(e))) diff --git a/load-balancer/src/ResultJob.py b/load-balancer/src/ResultJob.py new file mode 100644 index 00000000..02f61f0e --- /dev/null +++ b/load-balancer/src/ResultJob.py @@ -0,0 +1,162 @@ +import hashlib +import logging +import sys +from abc import abstractmethod +from .errors import missingTextBlocks, noUpdateNeeded, invalidResults, missingEmbeddings, missingTaskId, \ + missingClusters, missingDistanceMatrix, missingClusterTree +from .ConfigParser import ConfigParser +import requests +from requests.auth import HTTPBasicAuth +from .entities import Policy, NodeType, JobStatus +from .JSONHandler import JSONHandler, logger + + +class ResultJob: + + @staticmethod + @abstractmethod + def perform_result(self, job): + pass + + +def sendBackResults(job): + pass + + +def sizeof(obj): + size = sys.getsizeof(obj) + if isinstance(obj, dict): return size + sum(map(sizeof, obj.keys())) + sum(map(sizeof, obj.values())) + if isinstance(obj, (list, tuple, set, frozenset)): return size + sum(map(sizeof, obj)) + return size + + +def triggerNodes(node_type: str): + node_types = ( + NodeType.segmentation, NodeType.embedding, NodeType.clustering, NodeType.embedding_wmt, NodeType.gpu) + if node_type not in node_types: + logger.error('Invalid node_type: \'{}\''.format(node_type)) + + # Parse compute nodes + config_parser = ConfigParser() + logger.info("Parsing {} nodes".format(node_type)) + nodes = config_parser.parseConfig(node_type) + + # Trigger all parsed nodes + logger.info("Triggering {} nodes".format(node_type)) + for node in nodes: + if node_type == NodeType.gpu: + requests.post(node.url, auth=HTTPBasicAuth(node.username, node.password)) + else: + requests.post(node.url, timeout=5) + + +class SegmentationResult(ResultJob): + + def __init__(self): + pass + + @staticmethod + def send_result(job, result): + logging.info("Segmentation Result") + if "textBlocks" not in result: + raise missingTextBlocks + JSONHandler.writeJsonToFile(job.id, "segmentation_result", result) + # Transform segmentation result to blocks (embedding input) + for block in result["textBlocks"]: + submission_id = int(block["id"]) + start_index = int(block["startIndex"]) + end_index = int(block["endIndex"]) + # Search for the corresponding submission and create block out of segmentation result information + for submission in job.submissions: + if submission["id"] == submission_id: + block_text = submission["text"][start_index:end_index] + id_string = str(submission_id) + ";" \ + + str(start_index) + "-" \ + + str(end_index) + ";" \ + + block_text + block_id = hashlib.sha1(id_string.encode()).hexdigest() + new_block = {"id": block_id, + "submissionId": submission_id, + "text": block_text, + "startIndex": start_index, + "endIndex": end_index, + "type": "AUTOMATIC"} + job.blocks.append(new_block) # Will persist in job + job.blocks_to_embed.append(new_block) # Will get removed with embedding queries + break + job.status = JobStatus.embedding_queued + logger.info("JobId {} transitioned to status {}".format(job.id, job.status)) + # Triggering the right embedding nodes + node_Type = Policy.define_embedding_type(job.multilingual) + triggerNodes(node_type=node_Type) + return {"detail": "Updated job: processed segmentation results"} + + +class EmbeddingResult: + + def __init__(self): + pass + + @staticmethod + def send_result(job, result): + + logging.info("Embedding Result") + + if "embeddings" not in result: + raise missingEmbeddings + if "taskId" not in result: + raise missingTaskId + + JSONHandler.writeJsonToFile(job.id, "embedding_result_" + str(result["taskId"]), result) + + # Add results to job and remove corresponding embedding-task out of queue + valid_results = False + for task in job.embedding_tasks: + if str(task.id) == str(result["taskId"]): + # Check if number of embeddings is correct + if not len(task.blocks) == len(result["embeddings"]): + raise invalidResults + for embedding in result["embeddings"]: + job.embeddings.append(embedding) + valid_results = True + logger.info("embedding-task {} of JobId {} finished".format(task.id, job.id)) + job.embedding_tasks.remove(task) + break + + if not valid_results: + raise noUpdateNeeded + + # Check if all embeddings of job finished + if len(job.blocks_to_embed) == 0 and len(job.embedding_tasks) == 0: + job.status = JobStatus.clustering_queued + logger.info("JobId {} transitioned to status {}".format(job.id, job.status)) + # Trigger clustering nodes + triggerNodes(node_type=NodeType.clustering) + + return {"detail": "Updated job: processed embedding results"} + + +class ClusteringResult: + + def __init__(self): + pass + + @staticmethod + def send_result(job, result): + logging.info("Clustering Job") + if "clusters" not in result: + raise missingClusters + if "distanceMatrix" not in result: + raise missingDistanceMatrix + if "clusterTree" not in result: + raise missingClusterTree + + JSONHandler.writeJsonToFile(job.id, "clustering_result", result) + + job.clusters = result["clusters"] + job.distanceMatrix = result["distanceMatrix"] + job.clusterTree = result["clusterTree"] + + # Send back results to Artemis via callback URL in the background + job.status = JobStatus.sending_back + return {"detail": "Updated job: processed clustering results"} \ No newline at end of file diff --git a/load-balancer/src/TaskFactory.py b/load-balancer/src/TaskFactory.py new file mode 100644 index 00000000..ac9640f6 --- /dev/null +++ b/load-balancer/src/TaskFactory.py @@ -0,0 +1,76 @@ +import sys + +from .errors import invalidTaskType, missingTaskType, invalidChunkSize, missingChunkSize, taskTypeError, noUpdateNeeded, \ + invalidJobId +from .TaskTypes import EmbeddingJob, SegmentationJob, ClusteringJob, TaskJob +from .entities import AtheneJob, JobStatus, NodeType, EmbeddingTask +import logging +from.ResultJob import SegmentationResult, EmbeddingResult, ClusteringResult + +logger = logging.getLogger() + +class TaskFactory: + + def __init__(self, job): + self.job = job + pass + + def set_status(task: dict): + # Error handling + if "taskType" not in task: + raise missingTaskType + + if task["taskType"] == "segmentation": + required_status = JobStatus.segmentation_queued + new_status = JobStatus.segmentation_processing + return required_status, new_status + + elif task["taskType"] == "embedding": + if "chunkSize" not in task: + raise missingChunkSize + if int(task["chunkSize"]) < 2: + raise invalidChunkSize + required_status = [JobStatus.embedding_queued, JobStatus.embedding_queued_and_processing] + new_status = JobStatus.embedding_queued_and_processing + return required_status, new_status + + elif task["taskType"] == "clustering": + required_status = JobStatus.clustering_queued + new_status = JobStatus.clustering_processing + return required_status, new_status + else: + raise invalidTaskType + + def define_job(task: dict, job : AtheneJob) -> TaskJob: + if task["taskType"] == "segmentation": + return SegmentationJob(job) + elif task["taskType"] == "embedding": + return EmbeddingJob(task, job) + elif task["taskType"] == "clustering": + return ClusteringJob(job) + else: + logger.error("Error with taskType {}".format(task["taskType"])) + raise taskTypeError + return + + def define_result(job, result): + if job.status == JobStatus.segmentation_processing and result["resultType"] == "segmentation": + return SegmentationResult() + + elif (job.status == JobStatus.embedding_processing + or job.status == JobStatus.embedding_queued_and_processing)\ + and (result["resultType"] == "embedding" or result["resultType"] == "embedding_wmt"): + return EmbeddingResult() + + elif job.status == JobStatus.clustering_processing and result["resultType"] == "clustering": + return ClusteringResult() + # No valid request + else: + logger.info("No such update for jobId {} needed. Job status is {}".format(result["jobId"], job.status)) + raise noUpdateNeeded + raise invalidJobId + + + + + diff --git a/load-balancer/src/TaskTypes.py b/load-balancer/src/TaskTypes.py new file mode 100644 index 00000000..987b7338 --- /dev/null +++ b/load-balancer/src/TaskTypes.py @@ -0,0 +1,78 @@ +from abc import abstractmethod + +from .entities import JobStatus, EmbeddingTask +from .JSONHandler import JSONHandler, logger + + + +class TaskJob: + + @staticmethod + @abstractmethod + def perform_task(self): + pass + + +class EmbeddingJob(TaskJob): + def __init__(self, task, job): + self.task = task + self.job = job + pass + + def perform_task(self): + # Create chunk of blocks for embedding node + chunk, rest = self.createEmbeddingChunk(self.job.blocks_to_embed, self.task["chunkSize"]) + self.job.embedding_task_count += 1 + new_task = EmbeddingTask(id=self.job.embedding_task_count, course_id=self.job.course_id, blocks=chunk) + self.job.embedding_tasks.append(new_task) + self.job.blocks_to_embed = rest + if len(self.job.blocks_to_embed) == 0: + self.job.status = JobStatus.embedding_processing + logger.info("embedding-task for JobId {} created: " + "taskId={}, size={}/{} (actual/requested)".format(self.job.id, + new_task.id, + len(new_task.blocks), + self.task["chunkSize"])) + + response_json = {"jobId": self.job.id, + "taskId": new_task.id, + "courseId": new_task.course_id, + "blocks": new_task.blocks, + } + JSONHandler.writeJsonToFile(self.job.id, "embedding_task_" + str(new_task.id), response_json) + return response_json + + def createEmbeddingChunk(self, blocks, size): + # Minimum of 2 blocks is required to compute embeddings - prevent 1 single remaining block + if len(blocks) == (size + 1): + size += 1 + + blockchunk = blocks[0:min(len(blocks), size)] + rest = blocks[min(len(blocks), size):] + return blockchunk, rest + + +class SegmentationJob(TaskJob): + + def __init__(self, job): + self.job = job + pass + + def perform_task(self): + response_json = {"jobId": self.job.id, "submissions": self.job.submissions} + JSONHandler.writeJsonToFile(self.job.id, "segmentation_task", response_json) + return response_json + + + + +class ClusteringJob(TaskJob): + + def __init__(self, job): + self.job = job + pass + + def perform_task(self): + response_json = {"jobId": self.job.id, "embeddings": self.job.embeddings, "multilingual": self.job.multilingual} + JSONHandler.writeJsonToFile(self.job.id, "clustering_task", response_json) + return response_json diff --git a/load-balancer/src/compute_node_config.yml b/load-balancer/src/compute_node_config.yml index 777191c3..610834a8 100644 --- a/load-balancer/src/compute_node_config.yml +++ b/load-balancer/src/compute_node_config.yml @@ -11,6 +11,7 @@ docker_nodes: - traefik_service_api: http://athene-traefik:8080/api/http/services/ segmentation_service_name: segmentation-athene@docker embedding_service_name: embedding-athene@docker + embedding_wmt_service_name: embedding-wmt-athene@docker clustering_service_name: clustering-athene@docker trigger_route: /trigger @@ -26,6 +27,11 @@ docker_nodes: # trigger_url: http://athene-embedding:8000/trigger # type: embedding +# - name: embedding-stand-alone +# # This url resolves to the embedding-container inside the docker-compose network +# trigger_url: http://athene-embedding-wmt:8000/trigger +# type: embedding-wmt + # - name: clustering-stand-alone # # This url resolves to the clustering-container inside the docker-compose network # trigger_url: http://athene-clustering:8000/trigger diff --git a/load-balancer/src/entities.py b/load-balancer/src/entities.py index 9d294d28..d179534f 100644 --- a/load-balancer/src/entities.py +++ b/load-balancer/src/entities.py @@ -9,6 +9,9 @@ class NodeType: segmentation = "segmentation" embedding = "embedding" + + embedding_wmt = "embedding_wmt" + clustering = "clustering" gpu = "gpu" @@ -74,7 +77,7 @@ class AtheneJob: clusterTree: list # cluster tree of the clusters status: str # See class JobStatus - def __init__(self, id: int, course_id: int, callback_url: str, submissions: dict): + def __init__(self, id: int, course_id: int, callback_url: str, submissions: dict, multilingual: bool): self.id = id self.course_id = course_id self.callback_url = callback_url @@ -86,9 +89,10 @@ def __init__(self, id: int, course_id: int, callback_url: str, submissions: dict self.embedding_tasks = list() self.embedding_task_count = 0 self.status = JobStatus.segmentation_queued + self.multilingual = multilingual def __str__(self): - return "AtheneJob - ID: " + str(self.id) + ", courseId: " + str(self.course_id) + ", CallbackURL: " + str(self.callback_url) + ", submission_date: " + str(self.submission_date) + return "AtheneJob - ID: " + str(self.id) + ", courseId: " + str(self.course_id) + ", CallbackURL: " + str(self.callback_url) + ", submission_date: " + str(self.submission_date) + ", multilingual: " + str(self.multilingual) class ComputeNode: @@ -105,3 +109,18 @@ def __init__(self, name: str, type: str, url: str): def __str__(self): return "Name: " + self.name + ", Type: " + self.type + ", URL: " + self.url + + +class Policy: + + def __init__(self): + pass + + def define_embedding_type(multilingual: bool): + if multilingual: + return NodeType.embedding_wmt + else: + return NodeType.embedding + + + diff --git a/load-balancer/src/main.py b/load-balancer/src/main.py index c31d8e5d..e94a6f6c 100644 --- a/load-balancer/src/main.py +++ b/load-balancer/src/main.py @@ -1,4 +1,4 @@ -from .entities import AtheneJob, JobStatus, NodeType, EmbeddingTask +from .entities import AtheneJob, JobStatus, NodeType, Policy from .errors import invalidAuthorization, invalidJson, missingCallbackUrl, missingSubmissions, missingTaskType,\ missingChunkSize, invalidChunkSize, invalidTaskType, taskTypeError, missingJobId, invalidJobId, missingResultType,\ invalidResultType, missingTextBlocks, missingEmbeddings, missingTaskId, invalidResults, noUpdateNeeded,\ @@ -6,6 +6,9 @@ from fastapi import BackgroundTasks, FastAPI, Request, Response, status from requests.auth import HTTPBasicAuth from src.ConfigParser import ConfigParser +from src.TaskFactory import TaskFactory +from src.JSONHandler import JSONHandler +from src.ResultJob import ClusteringResult import hashlib import logging import os @@ -13,6 +16,8 @@ import sys import src.clustering_pb2 as Protobuf + + logger = logging.getLogger() # Set log_level to logging.DEBUG to write log files with json contents (see writeJsonToFile()) # Warning: This will produce a lot of data in production systems @@ -25,8 +30,28 @@ logger.addHandler(handler) app = FastAPI() -queue = list() -job_counter = 0 + + +class JobQueue: + + def __init__(self): + self.queue = list() + self.job_counter = 0 + + def get_queue(self): + return self.queue + + def get_job_counter(self): + return self.job_counter + + +jsonHandler = JSONHandler() +policy = Policy() +job_queue = JobQueue() +queue = job_queue.get_queue() +job_counter = job_queue.get_job_counter() + + def sizeof(obj): size = sys.getsizeof(obj) @@ -45,31 +70,8 @@ def checkAuthorization(request: Request): request.headers.get("Authorization"))) raise invalidAuthorization - -async def parseJson(request: Request): - try: - return await request.json() - except Exception as e: - logger.error("Exception while parsing json: {}".format(str(e))) - raise invalidJson - - -def writeJsonToFile(job_id: int, filename: str, data): - # Only write file if log-level is DEBUG - if logger.level == logging.DEBUG: - try: - directory = "logs/job_" + str(job_id) - if not os.path.exists(directory): - os.makedirs(directory) - logger.debug("Writing data to logfile: {}".format(filename)) - with open(directory + "/" + filename + ".json", 'w') as outfile: - outfile.write(data) - except Exception as e: - logger.error("Error while writing logfile: {}".format(str(e))) - - def triggerNodes(node_type: str): - node_types = (NodeType.segmentation, NodeType.embedding, NodeType.clustering, NodeType.gpu) + node_types = (NodeType.segmentation, NodeType.embedding, NodeType.clustering, NodeType.embedding_wmt, NodeType.gpu) if node_type not in node_types: logger.error('Invalid node_type: \'{}\''.format(node_type)) @@ -151,7 +153,7 @@ def sendBackResults(job: AtheneJob): "Authorization": auth_secret, "Content-type": "application/x-protobuf" } - response = requests.post(job.callback_url, data=final_result, headers=headers, timeout=600) + response = requests.post(job.callback_url, data=final_result, headers=headers, timeout=1800) if response.status_code == status.HTTP_200_OK: logger.info("Callback successful") logger.info("Athene Job finished: " + str(job)) @@ -170,7 +172,7 @@ def sendBackResults(job: AtheneJob): async def submit_job(request: Request, response: Response): checkAuthorization(request) - job_request = await parseJson(request) + job_request = await JSONHandler.parseJson(request) # Error handling if "courseId" in job_request: @@ -187,11 +189,13 @@ async def submit_job(request: Request, response: Response): # Queue up new job global job_counter job_counter += 1 - writeJsonToFile(job_counter, "submission", job_request) + + JSONHandler.writeJsonToFile(job_counter, "submission", job_request) new_job = AtheneJob(id=job_counter, course_id=course_id, callback_url=job_request["callbackUrl"], - submissions=job_request["submissions"]) + submissions=job_request["submissions"], + multilingual=job_request["multilingual"]) queue.append(new_job) logger.info("New Athene Job added: " + str(new_job)) # Trigger segmentation nodes @@ -201,45 +205,15 @@ async def submit_job(request: Request, response: Response): return {"detail": "Submission successful"} -# Returns a chunk of size of the blocks and the rest of the blocks -def createEmbeddingChunk(blocks, size): - # Minimum of 2 blocks is required to compute embeddings - prevent 1 single remaining block - if len(blocks) == (size + 1): - size += 1 - - blockchunk = blocks[0:min(len(blocks), size)] - rest = blocks[min(len(blocks), size):] - return blockchunk, rest - - # Endpoint for compute nodes to get a task # This will update the corresponding job and set the status to "processing" + @app.get("/getTask") async def get_task(request: Request, response: Response): checkAuthorization(request) + task = await JSONHandler.parseJson(request) - task = await parseJson(request) - - # Error handling - if "taskType" not in task: - raise missingTaskType - - if task["taskType"] == "segmentation": - required_status = JobStatus.segmentation_queued - new_status = JobStatus.segmentation_processing - elif task["taskType"] == "embedding": - if "chunkSize" not in task: - raise missingChunkSize - if int(task["chunkSize"]) < 2: - raise invalidChunkSize - required_status = [JobStatus.embedding_queued, JobStatus.embedding_queued_and_processing] - new_status = JobStatus.embedding_queued_and_processing - elif task["taskType"] == "clustering": - required_status = JobStatus.clustering_queued - new_status = JobStatus.clustering_processing - else: - raise invalidTaskType - + required_status, new_status = TaskFactory.set_status(task) logger.info("Host {} requested {}-task".format(request.client.host, task["taskType"])) # TODO: Check for timed out jobs and put back in queue @@ -247,38 +221,8 @@ async def get_task(request: Request, response: Response): if hasattr(job, 'status') and job.status in required_status: job.status = new_status logger.info("Host {} gets {}-task for JobId {}".format(request.client.host, task["taskType"], job.id)) - if task["taskType"] == "segmentation": - response_json = {"jobId": job.id, "submissions": job.submissions} - writeJsonToFile(job.id, "segmentation_task", response_json) - return response_json - elif task["taskType"] == "embedding": - # Create chunk of blocks for embedding node - chunk, rest = createEmbeddingChunk(job.blocks_to_embed, task["chunkSize"]) - job.embedding_task_count += 1 - new_task = EmbeddingTask(id=job.embedding_task_count, course_id=job.course_id, blocks=chunk) - job.embedding_tasks.append(new_task) - job.blocks_to_embed = rest - if len(job.blocks_to_embed) == 0: - job.status = JobStatus.embedding_processing - logger.info("embedding-task for JobId {} created: " - "taskId={}, size={}/{} (actual/requested)".format(job.id, - new_task.id, - len(new_task.blocks), - task["chunkSize"])) - - response_json = {"jobId": job.id, - "taskId": new_task.id, - "courseId": new_task.course_id, - "blocks": new_task.blocks} - writeJsonToFile(job.id, "embedding_task_" + str(new_task.id), response_json) - return response_json - elif task["taskType"] == "clustering": - response_json = {"jobId": job.id, "embeddings": job.embeddings} - writeJsonToFile(job.id, "clustering_task", response_json) - return response_json - else: - logger.error("Error with taskType {}".format(task["taskType"])) - raise taskTypeError + defined_job = TaskFactory.define_job(task, job) + return defined_job.perform_task() response.status_code = status.HTTP_204_NO_CONTENT return {"detail": "No {}-task available".format(str(task["taskType"]))} @@ -289,7 +233,7 @@ async def get_task(request: Request, response: Response): async def send_result(request: Request, response: Response, background_tasks: BackgroundTasks): checkAuthorization(request) - result = await parseJson(request) + result = await JSONHandler.parseJson(request) # Error handling if "jobId" not in result: @@ -298,109 +242,27 @@ async def send_result(request: Request, response: Response, background_tasks: Ba raise invalidJobId if "resultType" not in result: raise missingResultType - if result["resultType"] not in ["segmentation", "embedding", "clustering"]: + if result["resultType"] not in ["segmentation", "embedding", "clustering", "embedding_wmt"]: raise invalidResultType + logger.info("Host {} sent result for {}-task with jobId {}".format(request.client.host, result["resultType"], - result["jobId"])) + result["jobId"], + )) # Search for job with provided jobId for job in queue: if job.id == int(result["jobId"]): - # Segmentation results - if job.status == JobStatus.segmentation_processing and result["resultType"] == "segmentation": - if "textBlocks" not in result: - raise missingTextBlocks - writeJsonToFile(job.id, "segmentation_result", result) - # Transform segmentation result to blocks (embedding input) - for block in result["textBlocks"]: - submission_id = int(block["id"]) - start_index = int(block["startIndex"]) - end_index = int(block["endIndex"]) - # Search for the corresponding submission and create block out of segmentation result information - for submission in job.submissions: - if submission["id"] == submission_id: - block_text = submission["text"][start_index:end_index] - id_string = str(submission_id) + ";"\ - + str(start_index) + "-"\ - + str(end_index) + ";"\ - + block_text - block_id = hashlib.sha1(id_string.encode()).hexdigest() - new_block = {"id": block_id, - "submissionId": submission_id, - "text": block_text, - "startIndex": start_index, - "endIndex": end_index, - "type": "AUTOMATIC"} - job.blocks.append(new_block) # Will persist in job - job.blocks_to_embed.append(new_block) # Will get removed with embedding queries - break - job.status = JobStatus.embedding_queued - logger.info("JobId {} transitioned to status {}".format(job.id, job.status)) - # Trigger embedding nodes - triggerNodes(node_type=NodeType.embedding) - return {"detail": "Updated job: processed segmentation results"} - # Embedding results - elif (job.status == JobStatus.embedding_processing - or job.status == JobStatus.embedding_queued_and_processing)\ - and result["resultType"] == "embedding": - if "embeddings" not in result: - raise missingEmbeddings - if "taskId" not in result: - raise missingTaskId - - writeJsonToFile(job.id, "embedding_result_" + str(result["taskId"]), result) - - # Add results to job and remove corresponding embedding-task out of queue - valid_results = False - for task in job.embedding_tasks: - if str(task.id) == str(result["taskId"]): - # Check if number of embeddings is correct - if not len(task.blocks) == len(result["embeddings"]): - raise invalidResults - for embedding in result["embeddings"]: - job.embeddings.append(embedding) - valid_results = True - logger.info("embedding-task {} of JobId {} finished".format(task.id, job.id)) - job.embedding_tasks.remove(task) - break - - if not valid_results: - raise noUpdateNeeded - - # Check if all embeddings of job finished - if len(job.blocks_to_embed) == 0 and len(job.embedding_tasks) == 0: - job.status = JobStatus.clustering_queued - logger.info("JobId {} transitioned to status {}".format(job.id, job.status)) - # Trigger clustering nodes - triggerNodes(node_type=NodeType.clustering) - - return {"detail": "Updated job: processed embedding results"} - # Clustering results - elif job.status == JobStatus.clustering_processing and result["resultType"] == "clustering": - if "clusters" not in result: - raise missingClusters - if "distanceMatrix" not in result: - raise missingDistanceMatrix - if "clusterTree" not in result: - raise missingClusterTree - - writeJsonToFile(job.id, "clustering_result", result) - - job.clusters = result["clusters"] - job.distanceMatrix = result["distanceMatrix"] - job.clusterTree = result["clusterTree"] - - # Send back results to Artemis via callback URL in the background - job.status = JobStatus.sending_back + + result_type = TaskFactory.define_result(job, result) + + result_type.send_result(job, result) + + if isinstance(result_type, ClusteringResult): background_tasks.add_task(sendBackResults, job) - return {"detail": "Updated job: processed clustering results"} - # No valid request - else: - logger.info("No such update for jobId {} needed. Job status is {}".format(result["jobId"], job.status)) - raise noUpdateNeeded - raise invalidJobId + + # Provides statistics about the number of jobs summarized by their status @@ -449,4 +311,4 @@ def queueStatus(): "clustering_processing": clustering_processing, "sending_back": sending_back, "finished": finished, - "total": total} + "total": total} \ No newline at end of file