Skip to content

Commit

Permalink
Refatora task de agregação de diários (#81)
Browse files Browse the repository at this point in the history
- **Move interfaces para seus respectivos recursos**
- **Remove imports não usados**
- **Adiciona tasks.run_task**
- **Modifica ordem de comandos no Dockerfile**
- **Muda versão do Python para 3.9 no Dockerfile**
- **Otimiza uso de memória na task create_aggregates**
  • Loading branch information
ogecece authored Aug 23, 2024
2 parents 120c652 + daa54a3 commit 915a3b9
Show file tree
Hide file tree
Showing 37 changed files with 631 additions and 442 deletions.
1 change: 1 addition & 0 deletions data_extraction/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .interfaces import TextExtractorInterface
from .text_extraction import ApacheTikaTextExtractor, create_apache_tika_text_extraction
8 changes: 8 additions & 0 deletions data_extraction/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import abc

class TextExtractorInterface(abc.ABC):
@abc.abstractmethod
def extract_text(self, filepath: str) -> str:
"""
Extract the text from the given file
"""
2 changes: 1 addition & 1 deletion data_extraction/text_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import requests

from tasks import TextExtractorInterface
from .interfaces import TextExtractorInterface


class ApacheTikaTextExtractor(TextExtractorInterface):
Expand Down
1 change: 1 addition & 0 deletions database/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .interfaces import DatabaseInterface
from .postgresql import PostgreSQL, create_database_interface
39 changes: 39 additions & 0 deletions database/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Dict, Iterable, Tuple
import abc


class DatabaseInterface(abc.ABC):
"""
Interface to abstract the iteraction with the database storing data used by the
tasks
"""

@abc.abstractmethod
def _commit_changes(self, command: str, data: Dict) -> None:
"""
Make a change in the database and commit it
"""

@abc.abstractmethod
def select(self, command: str) -> Iterable[Tuple]:
"""
Select entries from the database
"""

@abc.abstractmethod
def insert(self, command: str, data: Dict) -> None:
"""
Insert entries into the database
"""

@abc.abstractmethod
def update(self, command: str, data: Dict) -> None:
"""
Update entries from the database
"""

@abc.abstractmethod
def delete(self, command: str, data: Dict) -> None:
"""
Delete entries from the database
"""
2 changes: 1 addition & 1 deletion database/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import psycopg2

from tasks import DatabaseInterface
from .interfaces import DatabaseInterface


def get_database_name():
Expand Down
1 change: 1 addition & 0 deletions index/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .interfaces import IndexInterface
from .opensearch import create_index_interface
44 changes: 44 additions & 0 deletions index/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Dict, Iterable
import abc


class IndexInterface(abc.ABC):
"""
Interface to abstract the interaction with the index system
"""

@abc.abstractmethod
def create_index(self, index_name: str, body: Dict) -> None:
"""
Create the index used by the application
"""

@abc.abstractmethod
def refresh_index(self, index_name: str) -> None:
"""
Refreshes the index to make it up-to-date for future searches
"""

@abc.abstractmethod
def index_document(
self, document: Dict, document_id: str, index: str, refresh: bool
) -> None:
"""
Upload document to the index
"""

@abc.abstractmethod
def search(self, query: Dict, index: str) -> Dict:
"""
Searches the index with the provided query
"""

@abc.abstractmethod
def paginated_search(
self, query: Dict, index: str, keep_alive: str
) -> Iterable[Dict]:
"""
Searches the index with the provided query, with pagination
"""


2 changes: 1 addition & 1 deletion index/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import opensearchpy

from tasks import IndexInterface
from .interfaces import IndexInterface


class OpenSearchInterface(IndexInterface):
Expand Down
1 change: 0 additions & 1 deletion main/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .__main__ import (
is_debug_enabled,
enable_debug_if_necessary,
start_to_process_pending_gazettes,
)
43 changes: 14 additions & 29 deletions main/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,7 @@
from database import create_database_interface
from storage import create_storage_interface
from index import create_index_interface
from tasks import (
create_aggregates,
create_gazettes_index,
create_aggregates_table,
create_themed_excerpts_index,
embedding_rerank_excerpts,
extract_text_from_gazettes,
extract_themed_excerpts_from_gazettes,
get_gazettes_to_be_processed,
get_themes,
get_territories,
tag_entities_in_excerpts,
)
from tasks import run_task


def is_debug_enabled():
Expand All @@ -44,30 +32,27 @@ def gazette_texts_pipeline():
storage = create_storage_interface()
index = create_index_interface()
text_extractor = create_apache_tika_text_extraction()
themes = get_themes()

create_gazettes_index(index)
territories = get_territories(database)
gazettes_to_be_processed = get_gazettes_to_be_processed(execution_mode, database)
indexed_gazette_ids = extract_text_from_gazettes(
gazettes_to_be_processed, territories, database, storage, index, text_extractor
)

themes = run_task("get_themes")

run_task("create_gazettes_index", index)
territories = run_task("get_territories", database)
gazettes_to_be_processed = run_task("get_gazettes_to_be_processed", execution_mode, database)
indexed_gazette_ids = run_task("extract_text_from_gazettes", gazettes_to_be_processed, territories, database, storage, index, text_extractor)

for theme in themes:
create_themed_excerpts_index(theme, index)
themed_excerpt_ids = extract_themed_excerpts_from_gazettes(
theme, indexed_gazette_ids, index
)
embedding_rerank_excerpts(theme, themed_excerpt_ids, index)
tag_entities_in_excerpts(theme, themed_excerpt_ids, index)
run_task("create_themed_excerpts_index", theme, index)
themed_excerpt_ids = run_task("extract_themed_excerpts_from_gazettes", theme, indexed_gazette_ids, index)
run_task("embedding_rerank_excerpts", theme, themed_excerpt_ids, index)
run_task("tag_entities_in_excerpts", theme, themed_excerpt_ids, index)


def aggregates_pipeline():
database = create_database_interface()
storage = create_storage_interface()

create_aggregates_table(database)
create_aggregates(database, storage)
run_task("create_aggregates_table", database)
run_task("create_aggregates", database, storage)


def execute_pipeline(pipeline):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ scikit-learn==1.0.2
sentence-transformers==2.2.0
huggingface-hub==0.10.1 # fix: https://github.com/UKPLab/sentence-transformers/issues/1762
python-slugify[unidecode]==8.0.1
numpy==1.26.4 # fix numpy dtype size changed error with numpy>=2.0.0
8 changes: 4 additions & 4 deletions scripts/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/python:3.8
FROM docker.io/python:3.9

ENV USER gazette
ENV USER_HOME /home/$USER
Expand All @@ -16,8 +16,8 @@ ENV PYTHONPATH $WORKDIR
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . $WORKDIR
WORKDIR $WORKDIR
USER $USER

RUN python -c "import sentence_transformers; sentence_transformers.SentenceTransformer('neuralmind/bert-base-portuguese-cased').save('"$USER_HOME"/models/bert-base-portuguese-cased')"

COPY . $WORKDIR
WORKDIR $WORKDIR
1 change: 1 addition & 0 deletions storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .digital_ocean_spaces import DigitalOceanSpaces, create_storage_interface
from .interfaces import StorageInterface
65 changes: 63 additions & 2 deletions storage/digital_ocean_spaces.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
import os
from typing import Generator, Union
from typing import Union
from io import BytesIO
from pathlib import Path

import boto3

from tasks import StorageInterface
from .interfaces import StorageInterface


def get_storage_region():
Expand Down Expand Up @@ -91,6 +91,67 @@ def upload_content(
content_to_be_uploaded, self._bucket, file_key, ExtraArgs={"ACL": permission}
)

def upload_file(
self,
file_key: str,
file_path: str,
permission: str = "public-read",
) -> None:
logging.debug(f"Uploading {file_key}")
self._client.upload_file(
file_path, self._bucket, file_key, ExtraArgs={"ACL": permission}
)

def upload_file_multipart(
self,
file_key: str,
file_path: str,
permission: str = "public-read",
part_size: int = 100 * 1024 * 1024,
) -> None:
logging.debug(f"Uploading {file_key} with multipart")

multipart_upload = self._client.create_multipart_upload(Bucket=self._bucket, Key=file_key, ACL=permission)
upload_id = multipart_upload['UploadId']

parts = []

try:
with open(file_path, 'rb') as file:
part_number = 1
while True:
data = file.read(part_size)
if not data:
break

response = self._client.upload_part(
Bucket=self._bucket,
Key=file_key,
PartNumber=part_number,
UploadId=upload_id,
Body=data
)

parts.append({
'PartNumber': part_number,
'ETag': response['ETag']
})
part_number += 1

self._client.complete_multipart_upload(
Bucket=self._bucket,
Key=file_key,
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)

except Exception as e:
logging.debug(f"Aborted uploading {file_key} with multipart")
self._client.abort_multipart_upload(Bucket=self._bucket, Key=file_key, UploadId=upload_id)
raise e
else:
logging.debug(f"Finished uploading {file_key} with multipart")

def copy_file(self, source_file_key: str, destination_file_key: str) -> None:
logging.debug(f"Copying {source_file_key} to {destination_file_key}")
self._client.copy_object(
Expand Down
34 changes: 34 additions & 0 deletions storage/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Union
from pathlib import Path
import abc
from io import BytesIO


class StorageInterface(abc.ABC):
"""
Interface to abstract the interaction with the object store system.
"""

@abc.abstractmethod
def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None:
"""
Download the given file key in the destination on the host
"""

@abc.abstractmethod
def upload_content(self, file_key: str, content_to_be_uploaded: Union[str, BytesIO]) -> None:
"""
Upload the given content to the destination on the host
"""

@abc.abstractmethod
def copy_file(self, source_file_key: str, destination_file_key: str) -> None:
"""
Copy the given source file to the destination place on the host
"""

@abc.abstractmethod
def delete_file(self, file_key: str) -> None:
"""
Delete a file on the host.
"""
39 changes: 23 additions & 16 deletions tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from .create_index import create_gazettes_index, create_themed_excerpts_index
from .create_aggregates_table import create_aggregates_table
from .gazette_excerpts_embedding_reranking import embedding_rerank_excerpts
from .gazette_excerpts_entities_tagging import tag_entities_in_excerpts
from .gazette_text_extraction import extract_text_from_gazettes
from .gazette_themed_excerpts_extraction import extract_themed_excerpts_from_gazettes
from .gazette_themes_listing import get_themes
from .gazette_txt_to_xml import create_aggregates
from .interfaces import (
DatabaseInterface,
StorageInterface,
IndexInterface,
TextExtractorInterface,
)
from .list_gazettes_to_be_processed import get_gazettes_to_be_processed
from .list_territories import get_territories
from importlib import import_module


AVAILABLE_TASKS = {
"create_aggregates": "tasks.gazette_txt_to_xml",
"create_gazettes_index": "tasks.create_index",
"create_aggregates_table": "tasks.create_aggregates_table",
"create_themed_excerpts_index": "tasks.create_index",
"embedding_rerank_excerpts": "tasks.gazette_excerpts_embedding_reranking",
"extract_text_from_gazettes": "tasks.gazette_text_extraction",
"extract_themed_excerpts_from_gazettes": "tasks.gazette_themed_excerpts_extraction",
"get_gazettes_to_be_processed": "tasks.list_gazettes_to_be_processed",
"get_themes": "tasks.gazette_themes_listing",
"get_territories": "tasks.list_territories",
"tag_entities_in_excerpts": "tasks.gazette_excerpts_entities_tagging",
}


def run_task(task_name: str, *args, **kwargs):
module = AVAILABLE_TASKS[task_name]
mod = import_module(module)
task = getattr(mod, task_name)
return task(*args, **kwargs)
2 changes: 1 addition & 1 deletion tasks/create_aggregates_table.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .interfaces import DatabaseInterface
from database import DatabaseInterface


def create_aggregates_table(database: DatabaseInterface):
Expand Down
2 changes: 1 addition & 1 deletion tasks/create_index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Dict

from .interfaces import IndexInterface
from index import IndexInterface


def create_gazettes_index(index: IndexInterface) -> None:
Expand Down
Loading

0 comments on commit 915a3b9

Please sign in to comment.