From 4e51a52f7fea29434477dea746726d70d3d6dcd6 Mon Sep 17 00:00:00 2001 From: Jefersonalves Date: Wed, 29 Nov 2023 19:43:31 -0300 Subject: [PATCH] =?UTF-8?q?Refatora=20processo=20de=20Segmenta=C3=A7=C3=A3?= =?UTF-8?q?o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Torna a busca pelos territórios em uma task Melhora a obtenção de dados do município a partir do nome Trata erros que ocorreram em diários recentes Lança exceção ao obter território se este não for encontrado Faz ajustes para não ser preciso instanciar a database no segmentador Refatora bifurcação entre diários normais e de associações Modifica uso de territories e gazette por segmentadores Territories agora são utilizados por segmentadores por meio de um atributo de instância criado na inicialização da classe. Também não parecia necessária a atribuição do objeto `Gazette` à instância do segmentador, então o objeto passa a ser usado apenas como argumento na `.get_gazette_segments()` e não mais na inicialização. Como a instância do segmentador pode ser reutilizada sem problemas, um padrão de singleton foi utilizado na criação das instâncias. Corrige segmentação com marcador de início falso Na edição de 7 de Dezembro de 2023 do diário da Associação dos Municípios Alagoanos, o diário de Dois Riachos possui o cabeçalho ``` ESTADO DE ALAGOAS PREFEITURA MUNICIPAL DE EDUCAÇÃO SECRETARIA MUNICIPAL DE EDUCAÇÃO DE DOIS RIACHOS ``` no meio do texto de um ato. Isso fazia com que a segmentação fosse engatilhada novamente, resultando em um segmento anterior quase vazio (praticamente apenas o cabeçalho) e o segmento atual com o nome de município incorreto ("EDUCAÇÃO"). Agora, a palavra EDUCAÇÃO foi inserida no regex como uma exceção notável. Refatora mapeamento de município para python-slugify Biblioteca já implementa as transformações que usamos e ainda provê mais funcionalidades. Agora, a chave criada para um município fica em formato mais clássico de slug ("uf-nome-municipio"). Refatora split_text de al_associacao_municipios O método `split_text_by_territory()` estava com problemas de extração incompleta. Ex: ``` { "_index" : "querido-diario", "_type" : "_doc", "_id" : "7b7ed1557de74c25cff6a04023ed525f", "_score" : null, "_source" : { "id" : 6, "territory_name" : "Maribondo", "source_text" : "Alagoas , 02 de Outubro de 2023 • Diário Oficial dos Municípios do Estado de Alagoas • ANO XI | Nº 2145", "date" : "2023-10-02", "edition_number" : "2145", "is_extra_edition" : false, "power" : "executive_legislative", "file_checksum" : "7b7ed1557de74c25cff6a04023ed525f", "scraped_at" : "2023-12-11T20:25:43.705771", "created_at" : "2023-12-11T20:25:45.500616", "processed" : true, "file_path" : "2700000/2023-10-02/a4c2994ffab9f0dabc0d2c3ad46436f917d61efb.pdf", "file_url" : "https://www-storage.voxtecnologia.com.br?m=sigpub.publicacao&f=9878&i=publicado_91279_2023-09-29_564051d01bafcfea6f84735fc59f4d94.pdf", "state_code" : "AL", "territory_id" : "2704807", "file_raw_txt" : "/2704807/2023-10-02/7b7ed1557de74c25cff6a04023ed525f.txt", "url" : "http://google.com/2700000/2023-10-02/a4c2994ffab9f0dabc0d2c3ad46436f917d61efb.pdf" }, ``` Neste caso, o `source_text` está apenas com o cabeçalho. Além disso, o código estava um pouco confuso, então uma refatoração foi realizada para simplificar a segmentação, passando a utilizar mais `re.split()` ao invés de iterar diretamente linha-a-linha. Outra consequência dessa mudança é que além de mais segmentos estarem sendo coletados, os textos dos segmentos refletem o texto original mais fielmente, pois o tratamento de espaços em branco e quebras de linha não está sendo feito linha-a-linha no corpo do segmento. Refatora gazette_text_extraction por conta de associações O fluxo da task estava um pouco confuso, especialmente na bifurcação entre arquivo de diário agregado ou diário completo. Essa refatoração tem como objetivo simplificar esse fluxo e por consequência, assim como a task, os segmentadores também foram simplificados. Refatora criação de índices para tasks Corrige slug na segmentação Para eliminar a necessidade de exceções por conta de municípios com alguma palavra separada por espaço em seu nome indevidamente, o slug agora não utiliza mais um separador e concatena tudo. Ex: - al-senador-rui-palmeira -> alsenadorruipalmeira - al-senador-rui-palme-ira -> alsenadorruipalmeira --- main/__main__.py | 8 +- requirements.txt | 1 + segmentation/base/association_segmenter.py | 7 +- segmentation/factory.py | 45 ++-- .../segmenters/al_associacao_municipios.py | 211 ++++++------------ tasks/__init__.py | 2 + tasks/create_index.py | 153 +++++++++++++ tasks/gazette_text_extraction.py | 199 +++++------------ tasks/gazette_themed_excerpts_extraction.py | 77 ------- tasks/list_territories.py | 28 +++ tasks/utils/__init__.py | 9 +- tasks/utils/python.py | 10 + tasks/utils/territories.py | 76 ++++--- 13 files changed, 402 insertions(+), 424 deletions(-) create mode 100644 tasks/create_index.py create mode 100644 tasks/list_territories.py create mode 100644 tasks/utils/python.py diff --git a/main/__main__.py b/main/__main__.py index ba4e2a0..e8dd5ec 100644 --- a/main/__main__.py +++ b/main/__main__.py @@ -6,11 +6,14 @@ from storage import create_storage_interface from index import create_index_interface from tasks import ( + create_gazettes_index, + create_themed_excerpts_index, embedding_rerank_excerpts, extract_text_from_gazettes, extract_themed_excerpts_from_gazettes, get_gazettes_to_be_processed, get_themes, + get_territories, tag_entities_in_excerpts, ) @@ -42,12 +45,15 @@ def execute_pipeline(): text_extractor = create_apache_tika_text_extraction() themes = get_themes() + create_gazettes_index(index) + territories = get_territories(database) gazettes_to_be_processed = get_gazettes_to_be_processed(execution_mode, database) indexed_gazette_ids = extract_text_from_gazettes( - gazettes_to_be_processed, database, storage, index, text_extractor + gazettes_to_be_processed, territories, database, storage, index, text_extractor ) for theme in themes: + create_themed_excerpts_index(theme, index) themed_excerpt_ids = extract_themed_excerpts_from_gazettes( theme, indexed_gazette_ids, index ) diff --git a/requirements.txt b/requirements.txt index 92894c1..eb8bb31 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ requests==2.25.0 scikit-learn==1.0.2 sentence-transformers==2.2.0 huggingface-hub==0.10.1 # fix: https://github.com/UKPLab/sentence-transformers/issues/1762 +python-slugify[unidecode]==8.0.1 diff --git a/segmentation/base/association_segmenter.py b/segmentation/base/association_segmenter.py index 45c2b51..0d777be 100644 --- a/segmentation/base/association_segmenter.py +++ b/segmentation/base/association_segmenter.py @@ -1,11 +1,10 @@ -from typing import Union, Dict, List +from typing import Any, Dict, Iterable, List, Union from segmentation.base import GazetteSegment class AssociationSegmenter: - def __init__(self, association_gazette: str, territory_to_data: Dict): - self.association_gazette = association_gazette - self.territory_to_data = territory_to_data + def __init__(self, territories: Iterable[Dict[str, Any]]): + self.territories = territories def get_gazette_segments(self, *args, **kwargs) -> List[Union[GazetteSegment, Dict]]: """ diff --git a/segmentation/factory.py b/segmentation/factory.py index dfd687a..65c693e 100644 --- a/segmentation/factory.py +++ b/segmentation/factory.py @@ -1,30 +1,34 @@ -from typing import Any, Dict +from typing import Any, Dict, Iterable from segmentation.base import AssociationSegmenter from segmentation import segmenters -def get_segmenter(territory_id: str, association_gazzete: Dict[str, Any], territory_to_data: Dict) -> AssociationSegmenter: +_segmenter_instances = {} + + +def get_segmenter(territory_id: str, territories: Iterable[Dict[str, Any]]) -> AssociationSegmenter: """ Factory method to return a AssociationSegmenter Example ------- - >>> association_gazette = { - "territory_name": "Associação", - "created_at": datetime.datetime.now(), - "date": datetime.datetime.now(), - "edition_number": 1, - "file_path": 'raw/pdf.pdf', - "file_url": 'localhost:8000/raw/pdf.pdf', - "is_extra_edition": True, - "power": 'executive', - "scraped_at": datetime.datetime.now(), - "state_code": 'AL', - "source_text": texto, - } + >>> territory_id = "9999999" + >>> territories = [ + { + "id": "9999999", + "territory_name": "Bairro do Limoeiro", + "state_code": "ZZ", + "state": "Limoeirolândia", + }, { + "id": "0000000", + "territory_name": "Castelo Rá-Tim-Bum", + "state_code": "SP", + "state": "São Paulo", + }, + ] >>> from segmentation import get_segmenter - >>> segmenter = get_segmenter(territory_id, association_gazette) + >>> segmenter = get_segmenter(territory_id, territories) >>> segments = segmenter.get_gazette_segments() Notes @@ -37,6 +41,9 @@ def get_segmenter(territory_id: str, association_gazzete: Dict[str, Any], territ "2700000": "ALAssociacaoMunicipiosSegmenter", } - segmenter_class_name = territory_to_segmenter_class[territory_id] - segmenter_class = getattr(segmenters, segmenter_class_name) - return segmenter_class(association_gazzete, territory_to_data) + if territory_id not in _segmenter_instances: + segmenter_class_name = territory_to_segmenter_class[territory_id] + segmenter_class = getattr(segmenters, segmenter_class_name) + _segmenter_instances[territory_id] = segmenter_class(territories) + + return _segmenter_instances[territory_id] diff --git a/segmentation/segmenters/al_associacao_municipios.py b/segmentation/segmenters/al_associacao_municipios.py index 794d9f3..7485e51 100644 --- a/segmentation/segmenters/al_associacao_municipios.py +++ b/segmentation/segmenters/al_associacao_municipios.py @@ -1,163 +1,88 @@ import re -import unicodedata +import logging from typing import Any, Dict, List from segmentation.base import AssociationSegmenter, GazetteSegment -from tasks.utils import get_checksum +from tasks.utils import batched, get_checksum, get_territory_data, get_territory_slug -class ALAssociacaoMunicipiosSegmenter(AssociationSegmenter): - def __init__(self, association_gazzete: Dict[str, Any], territory_to_data: Dict[str, Any]): - super().__init__(association_gazzete, territory_to_data) - # No final do regex, existe uma estrutura condicional que verifica se o próximo match é um \s ou SECRETARIA. Isso foi feito para resolver um problema no diário de 2018-10-02, em que o município de Coité do Nóia não foi percebido pelo código. Para resolver isso, utilizamos a próxima palavra (SECRETARIA) para tratar esse caso. - # Exceções Notáveis - # String: VAMOS, município Poço das Trincheiras, 06/01/2022, ato CCB3A6AB - self.RE_NOMES_MUNICIPIOS = ( - r"ESTADO DE ALAGOAS(?:| )\n{1,2}PREFEITURA MUNICIPAL DE (.*\n{0,2}(?!VAMOS).*$)\n\s(?:\s|SECRETARIA)" - ) - self.association_source_text = self.association_gazette["source_text"] - self.territory_to_data = self._format_territory_to_data(territory_to_data) - def get_gazette_segments(self) -> List[Dict[str, Any]]: +class ALAssociacaoMunicipiosSegmenter(AssociationSegmenter): + RE_NOMES_MUNICIPIOS = re.compile( + r""" + (ESTADO\sDE\sALAGOAS(?:|\s)\n{1,2}PREFEITURA\sMUNICIPAL\sDE\s) # Marcador de início do cabeçalho de publicação do município + ((?!EDUCAÇÃO).*?\n{0,2}(?!VAMOS).*?$) # Nome do município (pode estar presente em até duas linhas). Exceções Notáveis: VAMOS, Poço das Trincheiras, 06/01/2022, ato CCB3A6AB; EDUCAÇÃO, Dois Riachos, 07/12/2023, ato ABCCE576 + (\n\s(?:\s|SECRETARIA|Secretaria)) # Marcador de fim do cabeçalho (pula mais de duas linhas). Exceções Notáveis: SECRETARIA, Coité do Nóia, 02/10/2018, ato 12F7DE15; Secretaria, Qubrângulo, 18/07/2023, atos 27FB2D83 a 1FAF9421 + """, + re.MULTILINE | re.VERBOSE, + ) + + def get_gazette_segments(self, gazette: Dict[str, Any]) -> List[Dict[str, Any]]: """ Returns a list of dicts with the gazettes metadata """ - territory_to_text_split = self.split_text_by_territory() - gazette_segments = [] - for municipio, texto_diario in territory_to_text_split.items(): - segmento = self.build_segment(municipio, texto_diario) - gazette_segments.append(segmento.__dict__) + territory_to_text_map = self.split_text_by_territory(gazette["source_text"]) + gazette_segments = [ + self.build_segment(territory_slug, segment_text, gazette).__dict__ + for territory_slug, segment_text in territory_to_text_map.items() + ] return gazette_segments - def split_text_by_territory(self) -> Dict[str, str]: + def split_text_by_territory(self, text: str) -> Dict[str, str]: """ Segment a association text by territory and returns a dict with the territory name and the text segment """ - texto_diario_slice = self.association_source_text.lstrip().splitlines() - - # Processamento - linhas_apagar = [] # slice de linhas a ser apagadas ao final. - ama_header = texto_diario_slice[0] - ama_header_count = 0 - codigo_count = 0 - codigo_total = self.association_source_text.count("Código Identificador") - - for num_linha, linha in enumerate(texto_diario_slice): - # Remoção do cabeçalho AMA, porém temos que manter a primeira aparição. - if linha.startswith(ama_header): - ama_header_count += 1 - if ama_header_count > 1: - linhas_apagar.append(num_linha) - - # Remoção das linhas finais - if codigo_count == codigo_total: - linhas_apagar.append(num_linha) - elif linha.startswith("Código Identificador"): - codigo_count += 1 - - # Apagando linhas do slice - texto_diario_slice = [l for n, l in enumerate( - texto_diario_slice) if n not in linhas_apagar] - - # Inserindo o cabeçalho no diário de cada município. - territory_to_text_split = {} - nomes_municipios = re.findall( - self.RE_NOMES_MUNICIPIOS, self.association_source_text, re.MULTILINE) - for municipio in nomes_municipios: - nome_municipio_normalizado = self._normalize_territory_name(municipio) - territory_to_text_split[nome_municipio_normalizado] = ama_header + '\n\n' - - num_linha = 0 - municipio_atual = None - while num_linha < len(texto_diario_slice): - linha = texto_diario_slice[num_linha].rstrip() - - if linha.startswith("ESTADO DE ALAGOAS"): - nome = self._extract_territory_name(texto_diario_slice, num_linha) - if nome is not None: - nome_normalizado = self._normalize_territory_name(nome) - municipio_atual = nome_normalizado - - # Só começa, quando algum muncípio for encontrado. - if municipio_atual is None: - num_linha += 1 - continue - - # Conteúdo faz parte de um muncípio - territory_to_text_split[municipio_atual] += linha + '\n' - num_linha += 1 - - return territory_to_text_split - - def build_segment(self, raw_territory_name, segment_text) -> GazetteSegment: - file_checksum = get_checksum(segment_text) - processed = True - source_text = segment_text.rstrip() - state = self.association_gazette.get("state_code") - raw_territory_name = self._fix_territory_name(raw_territory_name) - - territory_data = self.territory_to_data.get((self._clear_state_code(state), self._clear_city_name(raw_territory_name))) + ama_header = text.lstrip().split("\n", maxsplit=1)[0].rstrip() + # clean headers + clean_text = "\n".join(re.split(re.escape(ama_header), text)) + # clean final lines + clean_text = "\n".join( + re.split(r"(Código Ide ?ntificador:\s*\w+)", clean_text)[:-1] + ) - territory_id = territory_data["id"] - territory_name = territory_data["territory_name"] - date = self.association_gazette["date"] - file_raw_txt = f"/{territory_id}/{date}/{file_checksum}.txt" - - return GazetteSegment( - # same association values - id=self.association_gazette.get("id"), - created_at=self.association_gazette.get("created_at"), - date=self.association_gazette.get("date"), - edition_number=self.association_gazette.get("edition_number"), - file_path=self.association_gazette.get("file_path"), - file_url=self.association_gazette.get("file_url"), - is_extra_edition=self.association_gazette.get("is_extra_edition"), - power=self.association_gazette.get("power"), - scraped_at=self.association_gazette.get("scraped_at"), - state_code=state, - url=self.association_gazette.get("url"), + raw_segments = re.split(self.RE_NOMES_MUNICIPIOS, clean_text)[1:] + + territory_to_text_map = {} + for pattern_batch in batched(raw_segments, 4): + territory_name = pattern_batch[1] + clean_territory_name = self._normalize_territory_name(territory_name) + territory_slug = get_territory_slug(clean_territory_name, "AL") + previous_text_or_header = territory_to_text_map.setdefault( + territory_slug, f"{ama_header}\n" + ) + raw_batch_text = "".join(pattern_batch) + new_territory_text = f"{previous_text_or_header}\n{raw_batch_text}" + territory_to_text_map[territory_slug] = new_territory_text + + return territory_to_text_map + + def build_segment( + self, territory_slug: str, segment_text: str, gazette: Dict + ) -> GazetteSegment: + logging.debug( + f"Creating segment for territory \"{territory_slug}\" from {gazette['file_path']} file." + ) + territory_data = get_territory_data(territory_slug, self.territories) + return GazetteSegment(**{ + **gazette, # segment specific values - file_checksum=file_checksum, - processed=processed, - territory_name=territory_name, - source_text=source_text, - territory_id=territory_id, - file_raw_txt=file_raw_txt, + "processed": True, + "file_checksum": get_checksum(segment_text), + "source_text": segment_text.strip(), + "territory_name": territory_data["territory_name"], + "territory_id": territory_data["id"], + }) + + def _normalize_territory_name(self, territory_name: str) -> str: + clean_name = territory_name.strip().replace("\n", "") + # Alguns nomes de municípios possuem um /AL no final, exemplo: Viçosa no diário 2022-01-17, ato 8496EC0A. Para evitar erros como "vicosa-/al-secretaria-municipal...", a linha seguir remove isso. + clean_name = re.sub( + "\s*(\/AL.*|GABINETE DO PREFEITO.*|PODER.*|http.*|PORTARIA.*|Extrato.*|ATA DE.*|SECRETARIA.*|Fundo.*|SETOR.*|ERRATA.*|- AL.*|GABINETE.*|EXTRATO.*|SÚMULA.*|RATIFICAÇÃO.*)", + "", + clean_name, ) - - def _normalize_territory_name(self, municipio: str) -> str: - municipio = municipio.rstrip().replace('\n', '') # limpeza inicial - # Alguns nomes de municípios possuem um /AL no final, exemplo: Viçosa no diário 2022-01-17, ato 8496EC0A. Para evitar erros como "vicosa-/al-secretaria-municipal...", a linha seguir remove isso. - municipio = re.sub("(\/AL.*|GABINETE DO PREFEITO.*|PODER.*|http.*|PORTARIA.*|Extrato.*|ATA DE.*|SECRETARIA.*|Fundo.*|SETOR.*|ERRATA.*|- AL.*|GABINETE.*)", "", municipio) - return municipio - - def _extract_territory_name(self, texto_diario_slice: List[str], num_linha: int): - texto = '\n'.join(texto_diario_slice[num_linha:num_linha+10]) - match = re.findall(self.RE_NOMES_MUNICIPIOS, texto, re.MULTILINE) - if len(match) > 0: - return match[0].strip().replace('\n', '') - return None - - def _format_territory_to_data(self, territory_to_data: Dict[str, Any]): - territory_to_data = { - (self._clear_state_code(k[0]), self._clear_city_name(k[1])): v for k, v in territory_to_data.items() + name_to_fixed = { + "MAJOR IZIDORO": "MAJOR ISIDORO", } - return territory_to_data - - def _clear_city_name(self, name: str): - clean_name = name.replace("'", "") - clean_name = unicodedata.normalize("NFD", clean_name) - clean_name = clean_name.encode("ascii", "ignore").decode("utf-8") - clean_name = clean_name.lower() - clean_name = clean_name.strip() - return clean_name - - def _clear_state_code(self, code: str): - return code.lower().strip() - - def _fix_territory_name(self, name: str): - #clean_name = "major isidoro" if clean_name == "major izidoro" else clean_name - if name == "major izidoro": - return "major isidoro" - return name \ No newline at end of file + return name_to_fixed.get(clean_name, clean_name) diff --git a/tasks/__init__.py b/tasks/__init__.py index bb16ccd..63fd625 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,3 +1,4 @@ +from .create_index import create_gazettes_index, create_themed_excerpts_index from .gazette_excerpts_embedding_reranking import embedding_rerank_excerpts from .gazette_excerpts_entities_tagging import tag_entities_in_excerpts from .gazette_text_extraction import extract_text_from_gazettes @@ -10,3 +11,4 @@ TextExtractorInterface, ) from .list_gazettes_to_be_processed import get_gazettes_to_be_processed +from .list_territories import get_territories diff --git a/tasks/create_index.py b/tasks/create_index.py new file mode 100644 index 0000000..5e3eedf --- /dev/null +++ b/tasks/create_index.py @@ -0,0 +1,153 @@ +from typing import Dict + +from .interfaces import IndexInterface + + +def create_gazettes_index(index: IndexInterface) -> None: + body = { + "mappings": { + "properties": { + "created_at": {"type": "date"}, + "date": {"type": "date"}, + "edition_number": { + "type": "text", + "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, + }, + "file_checksum": {"type": "keyword"}, + "file_path": {"type": "keyword"}, + "file_url": {"type": "keyword"}, + "id": {"type": "keyword"}, + "is_extra_edition": {"type": "boolean"}, + "power": {"type": "keyword"}, + "processed": {"type": "boolean"}, + "scraped_at": {"type": "date"}, + "source_text": { + "type": "text", + "analyzer": "brazilian", + "index_options": "offsets", + "term_vector": "with_positions_offsets", + "fields": { + "with_stopwords": { + "type": "text", + "analyzer": "brazilian_with_stopwords", + "index_options": "offsets", + "term_vector": "with_positions_offsets", + }, + "exact": { + "type": "text", + "analyzer": "exact", + "index_options": "offsets", + "term_vector": "with_positions_offsets", + } + }, + }, + "state_code": {"type": "keyword"}, + "territory_id": {"type": "keyword"}, + "territory_name": { + "type": "text", + "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, + }, + "url": {"type": "keyword"}, + } + }, + "settings": { + "index": { + "sort.field": ["territory_id", "date"], + "sort.order": ["asc", "desc"] + }, + "analysis": { + "filter": { + "brazilian_stemmer": { + "type": "stemmer", + "language": "brazilian", + } + }, + "analyzer": { + "brazilian_with_stopwords": { + "tokenizer": "standard", + "filter": ["lowercase", "brazilian_stemmer"], + }, + "exact": { + "tokenizer": "standard", + "filter": ["lowercase"], + }, + }, + } + }, + } + index.create_index(body=body) + + +def create_themed_excerpts_index(theme: Dict, index: IndexInterface) -> None: + body = { + "mappings": { + "properties": { + "excerpt_embedding_score": {"type": "rank_feature"}, + "excerpt_subthemes": {"type": "keyword"}, + "excerpt_entities": {"type": "keyword"}, + "excerpt": { + "type": "text", + "analyzer": "brazilian", + "index_options": "offsets", + "term_vector": "with_positions_offsets", + "fields": { + "with_stopwords": { + "type": "text", + "analyzer": "brazilian_with_stopwords", + "index_options": "offsets", + "term_vector": "with_positions_offsets", + }, + "exact": { + "type": "text", + "analyzer": "exact", + "index_options": "offsets", + "term_vector": "with_positions_offsets", + }, + }, + }, + "excerpt_id": {"type": "keyword"}, + "source_database_id": {"type": "long"}, + "source_index_id": {"type": "keyword"}, + "source_created_at": {"type": "date"}, + "source_date": {"type": "date"}, + "source_edition_number": {"type": "keyword"}, + "source_file_checksum": {"type": "keyword"}, + "source_file_path": {"type": "keyword"}, + "source_file_raw_txt": {"type": "keyword"}, + "source_file_url": {"type": "keyword"}, + "source_is_extra_edition": {"type": "boolean"}, + "source_power": {"type": "keyword"}, + "source_processed": {"type": "boolean"}, + "source_scraped_at": {"type": "date"}, + "source_state_code": {"type": "keyword"}, + "source_territory_id": {"type": "keyword"}, + "source_territory_name": {"type": "keyword"}, + "source_url": {"type": "keyword"}, + } + }, + "settings": { + "index": { + "sort.field": ["source_territory_id", "source_date"], + "sort.order": ["asc", "desc"] + }, + "analysis": { + "filter": { + "brazilian_stemmer": { + "type": "stemmer", + "language": "brazilian", + } + }, + "analyzer": { + "brazilian_with_stopwords": { + "tokenizer": "standard", + "filter": ["lowercase", "brazilian_stemmer"], + }, + "exact": { + "tokenizer": "standard", + "filter": ["lowercase"], + }, + }, + } + }, + } + index.create_index(index_name=theme["index"], body=body) diff --git a/tasks/gazette_text_extraction.py b/tasks/gazette_text_extraction.py index 495278d..603547e 100644 --- a/tasks/gazette_text_extraction.py +++ b/tasks/gazette_text_extraction.py @@ -2,9 +2,8 @@ import tempfile import os from pathlib import Path -from typing import Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Union from segmentation import get_segmenter -from tasks.utils import get_territory_to_data from .interfaces import ( DatabaseInterface, @@ -15,7 +14,8 @@ def extract_text_from_gazettes( - gazettes: Iterable[Dict], + gazettes: Iterable[Dict[str, Any]], + territories: Iterable[Dict[str, Any]], database: DatabaseInterface, storage: StorageInterface, index: IndexInterface, @@ -25,39 +25,27 @@ def extract_text_from_gazettes( Extracts the text from a list of gazettes """ logging.info("Starting text extraction from gazettes") - create_index(index) - territory_to_data = get_territory_to_data(database) ids = [] - association_ids = [] for gazette in gazettes: try: - if str(gazette["territory_id"][-4:]).strip() == "0000": - association_ids = try_process_gazette_association_file( - gazette, database, storage, index, text_extractor, territory_to_data - ) - else: - processed_gazette = try_process_gazette_file( - gazette, database, storage, index, text_extractor - ) - + document_ids = try_process_gazette_file( + gazette, territories, database, storage, index, text_extractor + ) except Exception as e: logging.warning( f"Could not process gazette: {gazette['file_path']}. Cause: {e}" ) logging.exception(e) else: - if association_ids: - ids += [association["file_checksum"] for association in association_ids.copy()] - association_ids.clear() - else: - ids.append(processed_gazette["file_checksum"]) + ids.extend(document_ids) return ids def try_process_gazette_file( gazette: Dict, + territories: Iterable[Dict[str, Any]], database: DatabaseInterface, storage: StorageInterface, index: IndexInterface, @@ -68,149 +56,74 @@ def try_process_gazette_file( """ logging.debug(f"Processing gazette {gazette['file_path']}") gazette_file = download_gazette_file(gazette, storage) - get_gazette_text_and_define_url(gazette, gazette_file, text_extractor) - upload_gazette_raw_text(gazette, storage) - index.index_document(gazette, document_id=gazette["file_checksum"]) + gazette["source_text"] = try_to_extract_content(gazette_file, text_extractor) + gazette["url"] = define_file_url(gazette["file_path"]) + gazette_txt_path = define_gazette_txt_path(gazette) + gazette["file_raw_txt"] = define_file_url(gazette_txt_path) + upload_raw_text(gazette_txt_path, gazette["source_text"], storage) delete_gazette_files(gazette_file) - set_gazette_as_processed(gazette, database) - return gazette + document_ids = [] + if gazette_type_is_aggregated(gazette): + segmenter = get_segmenter(gazette["territory_id"], territories) + territory_segments = segmenter.get_gazette_segments(gazette) + + for segment in territory_segments: + segment_txt_path = define_segment_txt_path(segment) + segment["file_raw_txt"] = define_file_url(segment_txt_path) + upload_raw_text(segment_txt_path, segment["source_text"], storage) + index.index_document(segment, document_id=segment["file_checksum"]) + document_ids.append(segment["file_checksum"]) + else: + index.index_document(gazette, document_id=gazette["file_checksum"]) + document_ids.append(gazette["file_checksum"]) + set_gazette_as_processed(gazette, database) + return document_ids -def try_process_gazette_association_file( - gazette: Dict, - database: DatabaseInterface, - storage: StorageInterface, - index: IndexInterface, - text_extractor: TextExtractorInterface, - territory_to_data: Dict, -) -> List: + +def gazette_type_is_aggregated(gazette: Dict): """ - Do all the work to extract the content from the gazette files + Checks if gazette contains publications by more than one city. + + Currently, this is being done by verifying if the territory_id finishes in "00000". + This is a special code we are using for gazettes from associations of cities from a + state. + + E.g. If cities from Alagoas have their territory_id's starting with "27", an + association file will be given territory_id "270000" and will be detected. """ + return str(gazette["territory_id"][-5:]).strip() == "00000" - logging.debug(f"Processing gazette {gazette['file_path']}") - pdf = download_gazette_file(gazette, storage) - get_gazette_text_and_define_url(gazette, pdf, text_extractor) - upload_gazette_raw_text(gazette, storage) - pdf_txt = try_to_extract_content(pdf, text_extractor) - gazette["source_text"] = pdf_txt - segmenter = get_segmenter(gazette["territory_id"], gazette, territory_to_data) - diarios = segmenter.get_gazette_segments() +def upload_raw_text(path: Union[str, Path], content: str, storage: StorageInterface): + """ + Upload gazette raw text file + """ + storage.upload_content(path, content) + logging.debug(f"Raw text uploaded {path}") - for diario in diarios: - upload_gazette_raw_text_association(diario, storage) - index.index_document(diario, document_id=diario["file_checksum"]) - delete_gazette_files(pdf) - set_gazette_as_processed(gazette, database) - return diarios - - -def create_index(index: IndexInterface) -> None: - body = { - "mappings": { - "properties": { - "created_at": {"type": "date"}, - "date": {"type": "date"}, - "edition_number": { - "type": "text", - "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, - }, - "file_checksum": {"type": "keyword"}, - "file_path": {"type": "keyword"}, - "file_url": {"type": "keyword"}, - "id": {"type": "keyword"}, - "is_extra_edition": {"type": "boolean"}, - "power": {"type": "keyword"}, - "processed": {"type": "boolean"}, - "scraped_at": {"type": "date"}, - "source_text": { - "type": "text", - "analyzer": "brazilian", - "index_options": "offsets", - "term_vector": "with_positions_offsets", - "fields": { - "with_stopwords": { - "type": "text", - "analyzer": "brazilian_with_stopwords", - "index_options": "offsets", - "term_vector": "with_positions_offsets", - }, - "exact": { - "type": "text", - "analyzer": "exact", - "index_options": "offsets", - "term_vector": "with_positions_offsets", - } - }, - }, - "state_code": {"type": "keyword"}, - "territory_id": {"type": "keyword"}, - "territory_name": { - "type": "text", - "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, - }, - "url": {"type": "keyword"}, - } - }, - "settings": { - "index": { - "sort.field": ["territory_id", "date"], - "sort.order": ["asc", "desc"] - }, - "analysis": { - "filter": { - "brazilian_stemmer": { - "type": "stemmer", - "language": "brazilian", - } - }, - "analyzer": { - "brazilian_with_stopwords": { - "tokenizer": "standard", - "filter": ["lowercase", "brazilian_stemmer"], - }, - "exact": { - "tokenizer": "standard", - "filter": ["lowercase"], - }, - }, - } - }, - } - index.create_index(body=body) - - -def upload_gazette_raw_text(gazette: Dict, storage): +def define_gazette_txt_path(gazette: Dict): """ - Define gazette raw text + Defines the gazette txt path in the storage """ - file_raw_txt = Path(gazette["file_path"]).with_suffix(".txt").as_posix() - storage.upload_content(file_raw_txt, gazette["source_text"]) - logging.debug(f"file_raw_txt uploaded {file_raw_txt}") - file_endpoint = get_file_endpoint() - gazette["file_raw_txt"] = f"{file_endpoint}/{file_raw_txt}" + return str(Path(gazette["file_path"]).with_suffix(".txt").as_posix()) + -def upload_gazette_raw_text_association(gazette: Dict, storage): +def define_segment_txt_path(segment: Dict): """ - Define gazette raw text and define the url to access the file in the storage + Defines the segment txt path in the storage """ - storage.upload_content(gazette["file_raw_txt"], gazette["source_text"]) - file_endpoint = get_file_endpoint() - gazette["file_raw_txt"] = f"{file_endpoint}{gazette['file_raw_txt']}" + return f"{segment['territory_id']}/{segment['date']}/{segment['file_checksum']}.txt" -def get_gazette_text_and_define_url( - gazette: Dict, gazette_file: str, text_extractor: TextExtractorInterface -): +def define_file_url(path: str): """ - Extract file content and define the url to access the file in the storage + Joins the storage endpoint with the path to form the URL """ - gazette["source_text"] = try_to_extract_content(gazette_file, text_extractor) file_endpoint = get_file_endpoint() - gazette["url"] = f"{file_endpoint}/{gazette['file_path']}" + return f"{file_endpoint}/{path}" def get_file_endpoint() -> str: diff --git a/tasks/gazette_themed_excerpts_extraction.py b/tasks/gazette_themed_excerpts_extraction.py index 1e87c89..f31d9a6 100644 --- a/tasks/gazette_themed_excerpts_extraction.py +++ b/tasks/gazette_themed_excerpts_extraction.py @@ -8,8 +8,6 @@ def extract_themed_excerpts_from_gazettes( theme: Dict, gazette_ids: List[str], index: IndexInterface ) -> List[str]: - create_index(theme, index) - ids = [] for theme_query in theme["queries"]: for excerpt in get_excerpts_from_gazettes_with_themed_query( @@ -31,81 +29,6 @@ def extract_themed_excerpts_from_gazettes( return ids -def create_index(theme: Dict, index: IndexInterface) -> None: - body = { - "mappings": { - "properties": { - "excerpt_embedding_score": {"type": "rank_feature"}, - "excerpt_subthemes": {"type": "keyword"}, - "excerpt_entities": {"type": "keyword"}, - "excerpt": { - "type": "text", - "analyzer": "brazilian", - "index_options": "offsets", - "term_vector": "with_positions_offsets", - "fields": { - "with_stopwords": { - "type": "text", - "analyzer": "brazilian_with_stopwords", - "index_options": "offsets", - "term_vector": "with_positions_offsets", - }, - "exact": { - "type": "text", - "analyzer": "exact", - "index_options": "offsets", - "term_vector": "with_positions_offsets", - }, - }, - }, - "excerpt_id": {"type": "keyword"}, - "source_database_id": {"type": "long"}, - "source_index_id": {"type": "keyword"}, - "source_created_at": {"type": "date"}, - "source_date": {"type": "date"}, - "source_edition_number": {"type": "keyword"}, - "source_file_checksum": {"type": "keyword"}, - "source_file_path": {"type": "keyword"}, - "source_file_raw_txt": {"type": "keyword"}, - "source_file_url": {"type": "keyword"}, - "source_is_extra_edition": {"type": "boolean"}, - "source_power": {"type": "keyword"}, - "source_processed": {"type": "boolean"}, - "source_scraped_at": {"type": "date"}, - "source_state_code": {"type": "keyword"}, - "source_territory_id": {"type": "keyword"}, - "source_territory_name": {"type": "keyword"}, - "source_url": {"type": "keyword"}, - } - }, - "settings": { - "index": { - "sort.field": ["source_territory_id", "source_date"], - "sort.order": ["asc", "desc"] - }, - "analysis": { - "filter": { - "brazilian_stemmer": { - "type": "stemmer", - "language": "brazilian", - } - }, - "analyzer": { - "brazilian_with_stopwords": { - "tokenizer": "standard", - "filter": ["lowercase", "brazilian_stemmer"], - }, - "exact": { - "tokenizer": "standard", - "filter": ["lowercase"], - }, - }, - } - }, - } - index.create_index(index_name=theme["index"], body=body) - - def get_excerpts_from_gazettes_with_themed_query( query: Dict, gazette_ids: List[str], index: IndexInterface ) -> Iterable[Dict]: diff --git a/tasks/list_territories.py b/tasks/list_territories.py new file mode 100644 index 0000000..ab7d663 --- /dev/null +++ b/tasks/list_territories.py @@ -0,0 +1,28 @@ +from functools import lru_cache +from typing import Dict, Iterable +from .interfaces import DatabaseInterface + + +@lru_cache +def get_territories( + database: DatabaseInterface, +) -> Iterable[Dict]: + """ + Example + ------- + >>> territories = get_territories_gazettes(database) + """ + command = """SELECT * FROM territories;""" + territories = [ + _format_territories_data(territory) for territory in database.select(command) + ] + return territories + + +def _format_territories_data(data): + return { + "id": data[0], + "territory_name": data[1], + "state_code": data[2], + "state": data[3], + } diff --git a/tasks/utils/__init__.py b/tasks/utils/__init__.py index e072e0f..c890c54 100644 --- a/tasks/utils/__init__.py +++ b/tasks/utils/__init__.py @@ -2,11 +2,14 @@ get_documents_from_query_with_highlights, get_documents_with_ids, ) +from .python import ( + batched, +) from .text import ( clean_extra_whitespaces, get_checksum, ) - from .territories import ( - get_territory_to_data, -) \ No newline at end of file + get_territory_slug, + get_territory_data, +) diff --git a/tasks/utils/python.py b/tasks/utils/python.py new file mode 100644 index 0000000..fc6e4ea --- /dev/null +++ b/tasks/utils/python.py @@ -0,0 +1,10 @@ +from itertools import islice + + +def batched(iterable, n): + # batched('ABCDEFG', 3) --> ABC DEF G + if n < 1: + raise ValueError('n must be at least one') + it = iter(iterable) + while batch := tuple(islice(it, n)): + yield batch diff --git a/tasks/utils/territories.py b/tasks/utils/territories.py index 9ad5120..e77235c 100644 --- a/tasks/utils/territories.py +++ b/tasks/utils/territories.py @@ -1,34 +1,42 @@ -from typing import Dict, Iterable -from ..interfaces import DatabaseInterface - - -def get_territory_to_data(database: DatabaseInterface): - territories = _get_territories_gazettes(database) - territory_to_data = { - ((t["state_code"], t["territory_name"])): t - for t in territories - } - return territory_to_data - - -def _get_territories_gazettes( - database: DatabaseInterface, -) -> Iterable[Dict]: - """ - Example - ------- - >>> territories = get_territories_gazettes(database) - """ - command = """SELECT * FROM territories;""" - territories = [ - _format_territories_data(territory) for territory in database.select(command) - ] - return territories - - -def _format_territories_data(data): - return { - "id": data[0], - "territory_name": data[1], - "state_code": data[2], - } +from typing import Any, Dict, Iterable, Tuple, Union + +from slugify import slugify + + +_territory_slug_to_data_map = {} + + +def get_territory_slug(name: str, state_code: str) -> str: + full_name = f"{state_code} {name}" + stopwords = ["de", "d", "da", "do", "das", "dos"] + replacements = [("´", "'"), ("`", "'")] + return slugify(full_name, separator="", stopwords=stopwords, replacements=replacements) + + +def get_territory_data(identifier: Union[str, Tuple[str, str]], territories: Iterable[Dict[str, Any]]) -> Dict[str, Dict]: + if isinstance(identifier, tuple): + territory_name, state_code = identifier + territory_slug = get_territory_slug(territory_name, state_code) + elif isinstance(identifier, str): + territory_slug = identifier + else: + raise TypeError(f"Identifier must be 'str' or 'tuple'. Got: {type(identifier)}") + + slug_to_data = get_territory_slug_to_data_map(territories) + + if territory_slug not in slug_to_data: + raise KeyError(f"Couldn't find info for \"{territory_slug}\"") + + return slug_to_data[territory_slug] + + +def get_territory_slug_to_data_map(territories: Iterable[Dict[str, Any]]) -> Dict[str, Dict]: + global _territory_slug_to_data_map + if not _territory_slug_to_data_map: + territory_to_data = { + get_territory_slug(t["territory_name"], t["state_code"]): t + for t in territories + } + _territory_slug_to_data_map = territory_to_data + + return _territory_slug_to_data_map