Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added multilingual support, refactored load balancer #49

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

# traefik variables
TRAEFIK_DASHBOARD_PORT=8081
TRAEFIK_DASHBOARD_PORT=8082
TRAEFIK_HTTP_PORT=80
TRAEFIK_HTTPS_PORT=443

Expand Down
16 changes: 15 additions & 1 deletion clustering/src/ProcessingResource.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import json
import os
import requests
import numpy as np


class ProcessingResource:
__logger = getLogger(__name__)
__clustering: Clustering = Clustering()
vocab_len = 42024

def __default(self, o) -> int :
if isinstance(o, int64): return int(o)
Expand All @@ -31,12 +33,18 @@ def processTask(self, data):
raise requireTwoEmbeddings

embeddings: List[Embedding] = list(map(lambda dict: Embedding.from_dict(dict), data['embeddings']))

if len(embeddings) < 2:
self.__logger.error("{} ({})".format(requireTwoEmbeddings.title, requireTwoEmbeddings.description))
raise requireTwoEmbeddings

self.__logger.info("Computing clusters of {} embeddings.".format(len(embeddings)))
vectors: List[ElmoVector] = list(map(lambda e: e.vector, embeddings))

if data["multilingual"]:
vectors = list(map(lambda v: self.one_hot_encode(v), vectors))
embeddings: List[Embedding] = list(map(lambda e : Embedding.from_wmt_embedding(e.id, self.one_hot_encode(e.vector)), embeddings))

labels, probabilities = self.__clustering.cluster(vectors)

clusterLabels: List[int] = list(map(lambda i: int(i), set(labels)))
Expand Down Expand Up @@ -125,7 +133,7 @@ def getNewTask(self):
headers = {
"Authorization": auth_secret
}
task = requests.get(get_task_url, json={"taskType": "clustering"}, headers=headers, timeout=30)
task = requests.get(get_task_url, json={"taskType": "clustering"}, headers=headers, timeout=300)
except Exception as e:
self.__logger.error("getTask-API seems to be down: {}".format(str(e)))
return None
Expand All @@ -138,3 +146,9 @@ def getNewTask(self):
except Exception as e:
self.__logger.error("Exception while parsing json: {}".format(str(e)))
return None

def one_hot_encode(self, vector):
encoded_vector = np.zeros(self.vocab_len)
for count, i in enumerate(vector):
encoded_vector[int(i)] = 1 if i < self.vocab_len else 0
return encoded_vector
5 changes: 5 additions & 0 deletions clustering/src/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, id: str, text: Sentence = None):
def from_dict(cls, dict: dict) -> 'TextBlock':
return cls(dict['id'], dict['text'])


class Embedding:
id: str
vector: ElmoVector
Expand All @@ -28,3 +29,7 @@ def __init__(self, id: str, vector: ElmoVector):
@classmethod
def from_dict(cls, dict: dict) -> 'Embedding':
return cls(dict['id'], dict['vector'])

@classmethod
def from_wmt_embedding(cls, id, vector):
return cls(id, vector)
Binary file added data/db/diagnostic.data/metrics.interim
Binary file not shown.
22 changes: 15 additions & 7 deletions docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ version: '3'
# By that, rebuilding the containers is not needed for code changes locally as long as no requirements change

services:
traefik:
# This component does not need to be changed
#traefik:
#This component does not need to be changed

load-balancer:
volumes:
Expand All @@ -19,9 +19,17 @@ services:

embedding:
volumes:
- ./embedding/src:/usr/src/app/src
- ./text_preprocessing:/usr/src/app/src/text_preprocessing
- ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro
#- ./embedding/src:/usr/src/app/src
#- ./text_preprocessing:/usr/src/app/src/text_preprocessing
#- ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro
# This will keep the subdirectory as it is in the container
- /usr/src/app/src/resources

embedding-wmt:
volumes:
- ./embedding_wmt/src:/usr/src/app/src
#- ./text_preprocessing:/usr/src/app/src/text_preprocessing
#- ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro
# This will keep the subdirectory as it is in the container
- /usr/src/app/src/resources

Expand All @@ -33,5 +41,5 @@ services:
volumes:
- ./tracking/src:/usr/src/app/src

database:
# This component does not need to be changed
#database:
# This component does not need to be changed
43 changes: 39 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
# http://localhost:8081/api
# http://localhost:8081/dashboard
traefik:
image: traefik:v2.2.1
image: "traefik:v2.2.1"
container_name: athene-traefik
restart: unless-stopped
depends_on:
Expand Down Expand Up @@ -83,7 +83,8 @@ services:
build:
context: .
dockerfile: ./embedding/Dockerfile
image: athene-embedding
image: "ghcr.io/ls1intum/athene/athene-embedding:94b958a"
#image: embedding_wmt
container_name: athene-embedding
restart: unless-stopped
depends_on:
Expand All @@ -105,8 +106,8 @@ services:
- CHUNK_SIZE=${EMBEDDING_CHUNK_SIZE}
- BALANCER_SENDRESULT_URL=${BALANCER_SENDRESULT_URL}
working_dir: /usr/src/app
volumes:
- ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro
#volumes:
# - ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro
networks:
- athene
labels:
Expand All @@ -117,6 +118,40 @@ services:
- traefik.http.routers.upload-tls.entrypoints=websecure
- traefik.http.routers.upload-tls.tls=true

embedding-wmt:
build:
context: .
dockerfile: ./embedding_wmt/Dockerfile
#image: "ghcr.io/ls1intum/athene/athene-embedding:94b958a"
image: embedding_wmt
container_name: athene-embedding-wmt
restart: unless-stopped
depends_on:
- load-balancer
- database
# ports:
# - 8002:8000
expose:
- 8000
environment:
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_USER=${EMBEDDING_DATABASE_USER}
- DATABASE_PWD=${EMBEDDING_DATABASE_PWD}
- BALANCER_AUTHORIZATION_SECRET=${LOAD_BALANCER_AUTHORIZATION_SECRET}
- BALANCER_QUEUE_FREQUENCY=${BALANCER_QUEUE_FREQUENCY}
- BALANCER_GETTASK_URL=${BALANCER_GETTASK_URL}
- CHUNK_SIZE=${EMBEDDING_CHUNK_SIZE}
- BALANCER_SENDRESULT_URL=${BALANCER_SENDRESULT_URL}
working_dir: /usr/src/app
#volumes:
# - ${EMBEDDING_CLOUD_CONFIG_PATH}:/usr/src/app/src/cloud/config.py:ro
networks:
- athene
labels:
- traefik.enable=true

clustering:
build:
context: .
Expand Down
2 changes: 2 additions & 0 deletions embedding/Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# Downlaad ELMo model from AllenNLP
# https://allennlp.org/elmo
deps:
@$(MAKE) -C src/resources/models;
1 change: 0 additions & 1 deletion embedding/src/resources/models/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Downlaad ELMo model from AllenNLP
# https://allennlp.org/elmo

deps: elmo_2x4096_512_2048cnn_2xhighway_5.5B
Expand Down
25 changes: 25 additions & 0 deletions embedding_wmt/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM python:3.9
LABEL author="Jan Philip Bernius <[email protected]>"

COPY ./embedding_wmt/requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -qr /tmp/requirements.txt


WORKDIR /usr/src/app
COPY ./embedding_wmt/src/resources src/resources

RUN apt-get -y update
RUN apt-get -y install git
RUN apt-get -y install git-lfs

RUN git clone https://huggingface.co/facebook/wmt19-de-en src/resources/models


# Run the image as a non-root user
RUN groupadd -r textemb && useradd --no-log-init -r -g textemb textemb
RUN mkdir -p /usr/src/app/logs/ && chown -R textemb:textemb /usr/src/app/
VOLUME ["/usr/src/app/logs"]
USER textemb

EXPOSE 8000
CMD ["uvicorn", "--host", "0.0.0.0", "--port", "8000", "src.main:app"]
2 changes: 2 additions & 0 deletions embedding_wmt/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
deps:
@$(MAKE) -C src/resources;
48 changes: 48 additions & 0 deletions embedding_wmt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Service runs on port 8002

```bash
uvicorn --port 8002 --host 0.0.0.0 --reload src.main:app
```

Note: In Artemis 8080 is written in configs

Configurable environment variables:

- BALANCER\_QUEUE\_FREQUENCY
- BALANCER\_GETTASK\_URL
- CHUNK\_SIZE
- BALANCER\_SENDRESULT\_URL

the following API-routes will be available after start:
[http://localhost:8002/trigger](http://localhost:8002/trigger)


Input example JSON for POST on http://localhost:8001/embed

```json
{
"courseId" : "1234",
"blocks": [
{
"id" : 1,
"text" : "This is the first text block."
},
{
"id" : 2,
"text" : "and this is the second one"
}
]
}
```


Input example JSON for POST on http://localhost:8001/upload

```json
{
"courseId" : "1234",
"fileName" : "file.pdf",
"fileData" : "a21s5d5sqa354a34575"
}
```

12 changes: 12 additions & 0 deletions embedding_wmt/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Listed dependencies as of 2021-02-16
transformers==4.16.1
docutils==0.18.1
fastapi==0.73.0
joblib==1.1.0
numpy==1.22.1
requests==2.27.1
scikit-learn==1.0.2
scipy==1.7.3
uvicorn==0.17.1
torch==1.8.0
sentencepiece==0.1.96
104 changes: 104 additions & 0 deletions embedding_wmt/src/ProcessingResource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import json
import os
import requests
from typing import List
from logging import getLogger
from datetime import datetime
from numpy import ndarray

from .entities import Sentence, TextBlock, Embedding
from .errors import requireTwoBlocks
from .wmt import WMT


class ProcessingResource:
__logger = getLogger(__name__)


def __default(self, o) -> int:
if isinstance(o, Embedding): return o.__dict__
if isinstance(o, ndarray): return o.tolist()
raise TypeError

# Starts processing of a queried task
def processTask(self, data):
self.__logger.debug("-" * 80)
self.__logger.info("Start processing Embedding Request:")

if "blocks" not in data:
self.__logger.error("{} ({})".format(requireTwoBlocks.title, requireTwoBlocks.description))
raise requireTwoBlocks

self.__wmt = WMT()

blocks: List[TextBlock] = list(map(lambda dict: TextBlock.from_dict(dict), data['blocks']))

sentences: List[Sentence] = list(map(lambda b: b.text, blocks))

if len(blocks) < 2:
self.__logger.error("{} ({})".format(requireTwoBlocks.title, requireTwoBlocks.description))
raise requireTwoBlocks

self.__logger.info("Computing embeddings of {} blocks.".format(len(blocks)))

vectors: List[List[int]] = self.__wmt.embed_sentences(sentences)
embeddings: List[Embedding] = [Embedding(block.id, vectors[i]) for i, block in enumerate(blocks)]

output = {
'embeddings': embeddings
}

self.__logger.info("Hier sind die Embeddings")
self.__logger.info(embeddings)

#Embedding ist fertig, es wird zurück an x geschickt
self.__logger.info("Completed Embedding Request.")
self.__logger.debug("-" * 80)

output["jobId"] = data["jobId"]
output["taskId"] = data["taskId"]
#type anpassen
output["resultType"] = "embedding_wmt"

try:
self.__logger.info("Writing logfile")
with open("logs/embedding-{}.json".format(datetime.now()), 'w') as outfile:
json.dump(output, outfile, ensure_ascii=False, default=self.__default)
except Exception as e:
self.__logger.error("Error while writing logfile: {}".format(str(e)))

self.__logger.info("Send back embedding-results")
# Get container variable for load balancer url
send_result_url = str(os.environ['BALANCER_SENDRESULT_URL']) if "BALANCER_SENDRESULT_URL" in os.environ else "http://localhost:8000/sendTaskResult"
auth_secret = str(os.environ['BALANCER_AUTHORIZATION_SECRET']) if "BALANCER_AUTHORIZATION_SECRET" in os.environ else ""
headers = {
"Authorization": auth_secret
}
response = requests.post(send_result_url, data=json.dumps(output, default=self.__default), headers=headers, timeout=30)
if response.status_code != 200:
self.__logger.error("Sending back failed: {}".format(response.text))

# Queries the taskQueue and returns the task data (json)
def getNewTask(self):
try:
# Get container variable for load balancer url
get_task_url = str(os.environ['BALANCER_GETTASK_URL']) if "BALANCER_GETTASK_URL" in os.environ else "http://localhost:8000/getTask"
chunk_size = int(os.environ['CHUNK_SIZE']) if "CHUNK_SIZE" in os.environ else 50
auth_secret = str(os.environ['BALANCER_AUTHORIZATION_SECRET']) if "BALANCER_AUTHORIZATION_SECRET" in os.environ else ""
headers = {
"Authorization": auth_secret
}
#hier embedding_wmt rein
task = requests.get(get_task_url, json={"taskType": "embedding", "chunkSize": chunk_size}, headers=headers, timeout=60)
except Exception as e:
self.__logger.error("getTask-API seems to be down: {}".format(str(e)))
return None

if task.status_code != 200:
return None

try:
return task.json()
except Exception as e:
self.__logger.error("Exception while parsing json: {}".format(str(e)))
return None
Loading