diff --git a/data_extraction/__init__.py b/data_extraction/__init__.py index c8d36f9..96c887b 100644 --- a/data_extraction/__init__.py +++ b/data_extraction/__init__.py @@ -1 +1,2 @@ +from .interfaces import TextExtractorInterface from .text_extraction import ApacheTikaTextExtractor, create_apache_tika_text_extraction diff --git a/data_extraction/interfaces.py b/data_extraction/interfaces.py new file mode 100644 index 0000000..b708f4b --- /dev/null +++ b/data_extraction/interfaces.py @@ -0,0 +1,8 @@ +import abc + +class TextExtractorInterface(abc.ABC): + @abc.abstractmethod + def extract_text(self, filepath: str) -> str: + """ + Extract the text from the given file + """ diff --git a/data_extraction/text_extraction.py b/data_extraction/text_extraction.py index 5ed87df..4de0f1b 100644 --- a/data_extraction/text_extraction.py +++ b/data_extraction/text_extraction.py @@ -5,7 +5,7 @@ import requests -from tasks import TextExtractorInterface +from .interfaces import TextExtractorInterface class ApacheTikaTextExtractor(TextExtractorInterface): diff --git a/database/__init__.py b/database/__init__.py index 88f24c0..b6da446 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -1 +1,2 @@ +from .interfaces import DatabaseInterface from .postgresql import PostgreSQL, create_database_interface diff --git a/database/interfaces.py b/database/interfaces.py new file mode 100644 index 0000000..d555d27 --- /dev/null +++ b/database/interfaces.py @@ -0,0 +1,39 @@ +from typing import Dict, Iterable, Tuple +import abc + + +class DatabaseInterface(abc.ABC): + """ + Interface to abstract the iteraction with the database storing data used by the + tasks + """ + + @abc.abstractmethod + def _commit_changes(self, command: str, data: Dict) -> None: + """ + Make a change in the database and commit it + """ + + @abc.abstractmethod + def select(self, command: str) -> Iterable[Tuple]: + """ + Select entries from the database + """ + + @abc.abstractmethod + def insert(self, command: str, data: Dict) -> None: + """ + Insert entries into the database + """ + + @abc.abstractmethod + def update(self, command: str, data: Dict) -> None: + """ + Update entries from the database + """ + + @abc.abstractmethod + def delete(self, command: str, data: Dict) -> None: + """ + Delete entries from the database + """ diff --git a/database/postgresql.py b/database/postgresql.py index d353630..dd34674 100644 --- a/database/postgresql.py +++ b/database/postgresql.py @@ -4,7 +4,7 @@ import psycopg2 -from tasks import DatabaseInterface +from .interfaces import DatabaseInterface def get_database_name(): diff --git a/index/__init__.py b/index/__init__.py index 8a62430..78d0d60 100644 --- a/index/__init__.py +++ b/index/__init__.py @@ -1 +1,2 @@ +from .interfaces import IndexInterface from .opensearch import create_index_interface diff --git a/index/interfaces.py b/index/interfaces.py new file mode 100644 index 0000000..41a80cd --- /dev/null +++ b/index/interfaces.py @@ -0,0 +1,44 @@ +from typing import Dict, Iterable +import abc + + +class IndexInterface(abc.ABC): + """ + Interface to abstract the interaction with the index system + """ + + @abc.abstractmethod + def create_index(self, index_name: str, body: Dict) -> None: + """ + Create the index used by the application + """ + + @abc.abstractmethod + def refresh_index(self, index_name: str) -> None: + """ + Refreshes the index to make it up-to-date for future searches + """ + + @abc.abstractmethod + def index_document( + self, document: Dict, document_id: str, index: str, refresh: bool + ) -> None: + """ + Upload document to the index + """ + + @abc.abstractmethod + def search(self, query: Dict, index: str) -> Dict: + """ + Searches the index with the provided query + """ + + @abc.abstractmethod + def paginated_search( + self, query: Dict, index: str, keep_alive: str + ) -> Iterable[Dict]: + """ + Searches the index with the provided query, with pagination + """ + + diff --git a/index/opensearch.py b/index/opensearch.py index 6684bd2..08fed97 100644 --- a/index/opensearch.py +++ b/index/opensearch.py @@ -3,7 +3,7 @@ import opensearchpy -from tasks import IndexInterface +from .interfaces import IndexInterface class OpenSearchInterface(IndexInterface): diff --git a/main/__init__.py b/main/__init__.py index 0a76ba4..0285851 100644 --- a/main/__init__.py +++ b/main/__init__.py @@ -1,5 +1,4 @@ from .__main__ import ( is_debug_enabled, enable_debug_if_necessary, - start_to_process_pending_gazettes, ) diff --git a/main/__main__.py b/main/__main__.py index 5f9951c..cd1da3f 100644 --- a/main/__main__.py +++ b/main/__main__.py @@ -6,19 +6,7 @@ from database import create_database_interface from storage import create_storage_interface from index import create_index_interface -from tasks import ( - create_aggregates, - create_gazettes_index, - create_aggregates_table, - create_themed_excerpts_index, - embedding_rerank_excerpts, - extract_text_from_gazettes, - extract_themed_excerpts_from_gazettes, - get_gazettes_to_be_processed, - get_themes, - get_territories, - tag_entities_in_excerpts, -) +from tasks import run_task def is_debug_enabled(): @@ -44,30 +32,27 @@ def gazette_texts_pipeline(): storage = create_storage_interface() index = create_index_interface() text_extractor = create_apache_tika_text_extraction() - themes = get_themes() - - create_gazettes_index(index) - territories = get_territories(database) - gazettes_to_be_processed = get_gazettes_to_be_processed(execution_mode, database) - indexed_gazette_ids = extract_text_from_gazettes( - gazettes_to_be_processed, territories, database, storage, index, text_extractor - ) + + themes = run_task("get_themes") + + run_task("create_gazettes_index", index) + territories = run_task("get_territories", database) + gazettes_to_be_processed = run_task("get_gazettes_to_be_processed", execution_mode, database) + indexed_gazette_ids = run_task("extract_text_from_gazettes", gazettes_to_be_processed, territories, database, storage, index, text_extractor) for theme in themes: - create_themed_excerpts_index(theme, index) - themed_excerpt_ids = extract_themed_excerpts_from_gazettes( - theme, indexed_gazette_ids, index - ) - embedding_rerank_excerpts(theme, themed_excerpt_ids, index) - tag_entities_in_excerpts(theme, themed_excerpt_ids, index) + run_task("create_themed_excerpts_index", theme, index) + themed_excerpt_ids = run_task("extract_themed_excerpts_from_gazettes", theme, indexed_gazette_ids, index) + run_task("embedding_rerank_excerpts", theme, themed_excerpt_ids, index) + run_task("tag_entities_in_excerpts", theme, themed_excerpt_ids, index) def aggregates_pipeline(): database = create_database_interface() storage = create_storage_interface() - create_aggregates_table(database) - create_aggregates(database, storage) + run_task("create_aggregates_table", database) + run_task("create_aggregates", database, storage) def execute_pipeline(pipeline): diff --git a/requirements.txt b/requirements.txt index 01dc91c..49134ef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ scikit-learn==1.0.2 sentence-transformers==2.2.0 huggingface-hub==0.10.1 # fix: https://github.com/UKPLab/sentence-transformers/issues/1762 python-slugify[unidecode]==8.0.1 +numpy==1.26.4 # fix numpy dtype size changed error with numpy>=2.0.0 diff --git a/scripts/Dockerfile b/scripts/Dockerfile index 139d337..ebfe2b8 100644 --- a/scripts/Dockerfile +++ b/scripts/Dockerfile @@ -1,4 +1,4 @@ -FROM docker.io/python:3.8 +FROM docker.io/python:3.9 ENV USER gazette ENV USER_HOME /home/$USER @@ -16,8 +16,8 @@ ENV PYTHONPATH $WORKDIR COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY . $WORKDIR -WORKDIR $WORKDIR USER $USER - RUN python -c "import sentence_transformers; sentence_transformers.SentenceTransformer('neuralmind/bert-base-portuguese-cased').save('"$USER_HOME"/models/bert-base-portuguese-cased')" + +COPY . $WORKDIR +WORKDIR $WORKDIR diff --git a/storage/__init__.py b/storage/__init__.py index c74de73..8d9c47c 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -1 +1,2 @@ from .digital_ocean_spaces import DigitalOceanSpaces, create_storage_interface +from .interfaces import StorageInterface diff --git a/storage/digital_ocean_spaces.py b/storage/digital_ocean_spaces.py index 792abd5..99b738a 100644 --- a/storage/digital_ocean_spaces.py +++ b/storage/digital_ocean_spaces.py @@ -1,12 +1,12 @@ import logging import os -from typing import Generator, Union +from typing import Union from io import BytesIO from pathlib import Path import boto3 -from tasks import StorageInterface +from .interfaces import StorageInterface def get_storage_region(): @@ -91,6 +91,67 @@ def upload_content( content_to_be_uploaded, self._bucket, file_key, ExtraArgs={"ACL": permission} ) + def upload_file( + self, + file_key: str, + file_path: str, + permission: str = "public-read", + ) -> None: + logging.debug(f"Uploading {file_key}") + self._client.upload_file( + file_path, self._bucket, file_key, ExtraArgs={"ACL": permission} + ) + + def upload_file_multipart( + self, + file_key: str, + file_path: str, + permission: str = "public-read", + part_size: int = 100 * 1024 * 1024, + ) -> None: + logging.debug(f"Uploading {file_key} with multipart") + + multipart_upload = self._client.create_multipart_upload(Bucket=self._bucket, Key=file_key, ACL=permission) + upload_id = multipart_upload['UploadId'] + + parts = [] + + try: + with open(file_path, 'rb') as file: + part_number = 1 + while True: + data = file.read(part_size) + if not data: + break + + response = self._client.upload_part( + Bucket=self._bucket, + Key=file_key, + PartNumber=part_number, + UploadId=upload_id, + Body=data + ) + + parts.append({ + 'PartNumber': part_number, + 'ETag': response['ETag'] + }) + part_number += 1 + + self._client.complete_multipart_upload( + Bucket=self._bucket, + Key=file_key, + UploadId=upload_id, + MultipartUpload={'Parts': parts} + ) + + except Exception as e: + logging.debug(f"Aborted uploading {file_key} with multipart") + self._client.abort_multipart_upload(Bucket=self._bucket, Key=file_key, UploadId=upload_id) + raise e + else: + logging.debug(f"Finished uploading {file_key} with multipart") + def copy_file(self, source_file_key: str, destination_file_key: str) -> None: logging.debug(f"Copying {source_file_key} to {destination_file_key}") self._client.copy_object( diff --git a/storage/interfaces.py b/storage/interfaces.py new file mode 100644 index 0000000..1578752 --- /dev/null +++ b/storage/interfaces.py @@ -0,0 +1,34 @@ +from typing import Union +from pathlib import Path +import abc +from io import BytesIO + + +class StorageInterface(abc.ABC): + """ + Interface to abstract the interaction with the object store system. + """ + + @abc.abstractmethod + def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None: + """ + Download the given file key in the destination on the host + """ + + @abc.abstractmethod + def upload_content(self, file_key: str, content_to_be_uploaded: Union[str, BytesIO]) -> None: + """ + Upload the given content to the destination on the host + """ + + @abc.abstractmethod + def copy_file(self, source_file_key: str, destination_file_key: str) -> None: + """ + Copy the given source file to the destination place on the host + """ + + @abc.abstractmethod + def delete_file(self, file_key: str) -> None: + """ + Delete a file on the host. + """ diff --git a/tasks/__init__.py b/tasks/__init__.py index 643a257..43a5e3f 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,16 +1,23 @@ -from .create_index import create_gazettes_index, create_themed_excerpts_index -from .create_aggregates_table import create_aggregates_table -from .gazette_excerpts_embedding_reranking import embedding_rerank_excerpts -from .gazette_excerpts_entities_tagging import tag_entities_in_excerpts -from .gazette_text_extraction import extract_text_from_gazettes -from .gazette_themed_excerpts_extraction import extract_themed_excerpts_from_gazettes -from .gazette_themes_listing import get_themes -from .gazette_txt_to_xml import create_aggregates -from .interfaces import ( - DatabaseInterface, - StorageInterface, - IndexInterface, - TextExtractorInterface, -) -from .list_gazettes_to_be_processed import get_gazettes_to_be_processed -from .list_territories import get_territories +from importlib import import_module + + +AVAILABLE_TASKS = { + "create_aggregates": "tasks.gazette_txt_to_xml", + "create_gazettes_index": "tasks.create_index", + "create_aggregates_table": "tasks.create_aggregates_table", + "create_themed_excerpts_index": "tasks.create_index", + "embedding_rerank_excerpts": "tasks.gazette_excerpts_embedding_reranking", + "extract_text_from_gazettes": "tasks.gazette_text_extraction", + "extract_themed_excerpts_from_gazettes": "tasks.gazette_themed_excerpts_extraction", + "get_gazettes_to_be_processed": "tasks.list_gazettes_to_be_processed", + "get_themes": "tasks.gazette_themes_listing", + "get_territories": "tasks.list_territories", + "tag_entities_in_excerpts": "tasks.gazette_excerpts_entities_tagging", +} + + +def run_task(task_name: str, *args, **kwargs): + module = AVAILABLE_TASKS[task_name] + mod = import_module(module) + task = getattr(mod, task_name) + return task(*args, **kwargs) diff --git a/tasks/create_aggregates_table.py b/tasks/create_aggregates_table.py index b28dc0e..17d169b 100644 --- a/tasks/create_aggregates_table.py +++ b/tasks/create_aggregates_table.py @@ -1,4 +1,4 @@ -from .interfaces import DatabaseInterface +from database import DatabaseInterface def create_aggregates_table(database: DatabaseInterface): diff --git a/tasks/create_index.py b/tasks/create_index.py index 5e3eedf..7abd8f0 100644 --- a/tasks/create_index.py +++ b/tasks/create_index.py @@ -1,6 +1,6 @@ from typing import Dict -from .interfaces import IndexInterface +from index import IndexInterface def create_gazettes_index(index: IndexInterface) -> None: diff --git a/tasks/gazette_excerpts_embedding_reranking.py b/tasks/gazette_excerpts_embedding_reranking.py index 3919056..3ffc40a 100644 --- a/tasks/gazette_excerpts_embedding_reranking.py +++ b/tasks/gazette_excerpts_embedding_reranking.py @@ -3,7 +3,7 @@ import sentence_transformers -from .interfaces import IndexInterface +from index import IndexInterface from .utils import get_documents_with_ids diff --git a/tasks/gazette_excerpts_entities_tagging.py b/tasks/gazette_excerpts_entities_tagging.py index 8c67303..c63522e 100644 --- a/tasks/gazette_excerpts_entities_tagging.py +++ b/tasks/gazette_excerpts_entities_tagging.py @@ -1,7 +1,7 @@ import re from typing import Dict, List -from .interfaces import IndexInterface +from index import IndexInterface from .utils import ( get_documents_from_query_with_highlights, get_documents_with_ids, diff --git a/tasks/gazette_text_extraction.py b/tasks/gazette_text_extraction.py index 603547e..18c0c3a 100644 --- a/tasks/gazette_text_extraction.py +++ b/tasks/gazette_text_extraction.py @@ -5,12 +5,10 @@ from typing import Any, Dict, Iterable, List, Union from segmentation import get_segmenter -from .interfaces import ( - DatabaseInterface, - IndexInterface, - StorageInterface, - TextExtractorInterface, -) +from data_extraction import TextExtractorInterface +from database import DatabaseInterface +from index import IndexInterface +from storage import StorageInterface def extract_text_from_gazettes( diff --git a/tasks/gazette_themed_excerpts_extraction.py b/tasks/gazette_themed_excerpts_extraction.py index 86e8f23..1b05882 100644 --- a/tasks/gazette_themed_excerpts_extraction.py +++ b/tasks/gazette_themed_excerpts_extraction.py @@ -1,7 +1,7 @@ import hashlib from typing import Dict, Iterable, List -from .interfaces import IndexInterface +from index import IndexInterface from .utils import batched, clean_extra_whitespaces, get_documents_from_query_with_highlights diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index a0fd853..68491c9 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -1,187 +1,263 @@ +import logging +import os import traceback import xml.etree.cElementTree as ET -import logging -from datetime import datetime, timedelta +from datetime import datetime from io import BytesIO -from xml.dom import minidom -from zipfile import ZipFile, ZIP_DEFLATED from pathlib import Path - -from .utils import hash_content, zip_needs_upsert, get_territory_slug -from .interfaces import StorageInterface,DatabaseInterface +from tempfile import mkstemp, TemporaryDirectory, NamedTemporaryFile +from typing import Iterable +from zipfile import ZIP_DEFLATED, ZipFile from botocore.exceptions import ClientError +from database import DatabaseInterface +from storage import StorageInterface +from .utils import br_timezone, get_territory_slug, hash_file + logger = logging.getLogger(__name__) -br_timezone = timedelta(hours=-3) -def create_aggregates(database:DatabaseInterface, storage:StorageInterface): +def create_aggregates(database: DatabaseInterface, storage: StorageInterface): """ Create xml for all territories available in database """ logger.info("Agregando os arquivos TXT para XML de territórios e estados...") - results_query_states = list(database.select("""SELECT - t.state_code AS code, - json_agg(json_build_object('id',t.id, 'name',t.name)) - FROM - territories t - WHERE - t.id in (SELECT DISTINCT - territory_id - FROM - gazettes - WHERE - territory_id NOT LIKE '%00000' - ) - GROUP BY - code - """ - )) - - for state, territories_list in results_query_states: - try: - create_aggregates_for_territories_and_states(territories_list, state, database, storage) - except Exception as e: - logger.error(f"Erro ao tentar processar municípios de {state}: {e}\n{traceback.format_exc()}") - - continue - - -def create_aggregates_for_territories_and_states(territories_list:list, state:str, database:DatabaseInterface, storage:StorageInterface): + results_query_states = database.select( + """ + SELECT + t.state_code AS code, + g_years.year, + json_agg(json_build_object('id', t.id, 'name', t.name)) + FROM + territories t + INNER JOIN ( + SELECT + date_part('Year', g.date) AS year, + g.territory_id + FROM + gazettes g + GROUP BY + year, + g.territory_id + ) as g_years + ON + g_years.territory_id=t.id + WHERE + t.id NOT LIKE '%00000' + GROUP BY + code, + g_years.year + """ + ) + + for state_code, year, territories_list in results_query_states: + create_aggregates_for_territories_and_state( + state_code, int(year), territories_list, database, storage + ) + + +def create_aggregates_for_territories_and_state( + state_code: str, + year: int, + territories_list: list, + database: DatabaseInterface, + storage: StorageInterface, +): """ - Create a .xml files for each year of gazettes for a territory + Create a .xml files for each year of gazettes for a territory """ - - xml_files_dict = {} - arr_years_update = [] + state_xmls = [] + state_needs_update = True + state_needs_update = False + + with TemporaryDirectory() as tmpdir: + for territory in territories_list: + gazettes_in_territory_year = database.select( + f""" + SELECT + to_jsonb(g) + FROM + gazettes g + WHERE + g.territory_id='{territory['id']}' + and date_part('Year', g.date)={year} + and g.processed=true + ORDER BY + g.date DESC + """ + ) + + xml_info = generate_xml_content( + state_code, + str(year), + territory, + gazettes_in_territory_year, + storage, + tmpdir, + ) + + state_xmls.append(xml_info) + + need_update_territory_zip = zip_needs_upsert(xml_info, database) - for territory in territories_list: - query_content_gazzetes_for_territory = list(database.select(f"""SELECT - date_part('Year', g.date) AS year, - json_agg(g.*) - FROM - gazettes g - WHERE - g.territory_id='{territory['id']}' - and g.processed=true - GROUP BY - year - """ - )) + if need_update_territory_zip: + state_needs_update = True + try_create_zip_for_territory(xml_info, database, storage) - for year, list_gazzetes_content in query_content_gazzetes_for_territory: - year = str(int(year)) + if state_needs_update: + try_create_zip_for_state(state_xmls, state_code, year, database, storage) - meta_xml = xml_content_generate(state, year, territory, list_gazzetes_content, storage) - xml_files_dict.setdefault(year, []).append(meta_xml) +def generate_xml_content( + state_code: str, + year: str, + territory: dict, + gazettes_in_territory_year: Iterable, + storage: StorageInterface, + save_dir: str, +): + """ + Generates xml file with gazzetes content + """ - territory_slug = get_territory_slug(meta_xml['territory_name'], state) - zip_path = f"aggregates/{meta_xml['state_code']}/{territory_slug}_{meta_xml['territory_id']}_{meta_xml['year']}.zip" - hx = hash_content(meta_xml['xml']) + logger.info( + f"Gerando XML para cidade {territory['name']}-{state_code} no ano {year}" + ) - logger.debug(f"Content hash for xml file of {zip_path}: {hx}") + tree = ET.ElementTree(ET.Element("root")) + populate_xml_tree( + tree, state_code, year, territory, gazettes_in_territory_year, storage + ) - need_update_territory_zip = zip_needs_upsert(hx, zip_path, database) + ET.indent(tree, space=" ") - if need_update_territory_zip: - if year not in arr_years_update: - arr_years_update.append(year) - create_zip_for_territory(hx, zip_path, meta_xml, database, storage) - - if arr_years_update: - create_zip_for_state(xml_files_dict, arr_years_update, state, database, storage) - - -def create_zip_for_state(xmls_years_dict:dict, arr_years_update:list, state_code, database:DatabaseInterface, storage:StorageInterface): - """ - Creating .zip files for the state with all its territories - """ + xml_file_descriptor, xml_file_path = mkstemp(dir=save_dir) + with os.fdopen(xml_file_descriptor, "w") as xml_file: + tree.write(xml_file, encoding="unicode") - for year in arr_years_update: - logger.info(f"Gerando ZIP do estado {state_code} no ano {year}") - - xmls = xmls_years_dict[year] - - zip_path = f"aggregates/{state_code}/{state_code}_{year}.zip" + xml_info = { + "xml_file_path": xml_file_path, + "hash_info": hash_file(xml_file_path), + "territory_id": territory["id"], + "territory_name": territory["name"], + "state_code": state_code, + "year": year, + } + return xml_info + + +def populate_xml_tree( + tree: ET.ElementTree, + state_code: str, + year: str, + territory: dict, + gazettes_in_territory_year: Iterable, + storage: StorageInterface, +): + root = tree.getroot() + meta_info_tag = ET.SubElement(root, "meta") + ET.SubElement(meta_info_tag, "uf").text = state_code + ET.SubElement(meta_info_tag, "ano_publicacao").text = str(year) + ET.SubElement(meta_info_tag, "municipio").text = territory["name"] + ET.SubElement(meta_info_tag, "municipio_codigo_ibge").text = territory["id"] + all_gazettes_tag = ET.SubElement(root, "diarios") + + for gazette_ in gazettes_in_territory_year: + gazette = gazette_[0] - zip_buffer = BytesIO() - - with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: - for xml_file in xmls: - territory_slug = get_territory_slug(xml_file['territory_name'], xml_file['state_code']) - zip_file.writestr(f"{territory_slug}_{xml_file['territory_id']}_{xml_file['year']}.xml", xml_file['xml']) - - zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) - zip_buffer.seek(0) - zip_buffer_copy = BytesIO(zip_buffer.getvalue()) - zip_buffer_copy.seek(0) - storage.upload_content(zip_path, zip_buffer) - - hx = hash_content(zip_buffer_copy.read()) - - logger.debug(f"Content hash for {zip_path}: {hx}") - - dict_query_info = { - "state_code" : state_code, - "territory_id" : None, - "file_path": zip_path, - "year": year, - "hash_info": hx, - "file_size_mb": zip_size, - "last_updated": datetime.utcnow() + br_timezone, - } - - database.insert("INSERT INTO aggregates \ - (territory_id, state_code, year, file_path, \ - file_size_mb, hash_info, last_updated) \ - VALUES (%(territory_id)s, %(state_code)s, \ - %(year)s, %(file_path)s, %(file_size_mb)s, \ - %(hash_info)s, %(last_updated)s) \ - ON CONFLICT(file_path) \ - DO UPDATE \ - SET state_code = EXCLUDED.state_code, last_updated=EXCLUDED.last_updated, \ - hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) - - zip_buffer.close() - - -def create_zip_for_territory(hx:str, zip_path:str, xml_file:dict, database:DatabaseInterface, storage:StorageInterface): + gazette_tag = ET.SubElement(all_gazettes_tag, "diario") + meta_gazette = ET.SubElement(gazette_tag, "meta_diario") + ET.SubElement(meta_gazette, "url_arquivo_original").text = gazette["file_url"] + ET.SubElement(meta_gazette, "poder").text = gazette["power"] + ET.SubElement(meta_gazette, "edicao_extra").text = ( + "Sim" if gazette["is_extra_edition"] else "Não" + ) + ET.SubElement(meta_gazette, "numero_edicao").text = ( + str(gazette["edition_number"]) + if str(gazette["edition_number"]) is not None + else "Não há" + ) + ET.SubElement(meta_gazette, "data_publicacao").text = datetime.strptime( + gazette["date"], "%Y-%m-%d" + ).strftime("%d/%m/%Y") + + path_arq_bucket = Path(gazette["file_path"]).with_suffix(".txt") + with BytesIO() as file_gazette_txt: + try: + storage.get_file(path_arq_bucket, file_gazette_txt) + except ClientError as e: + logger.warning( + f"Erro na obtenção do conteúdo de texto do diário {path_arq_bucket}: {e}" + ) + continue + + ET.SubElement( + gazette_tag, "conteudo" + ).text = file_gazette_txt.getvalue().decode("utf-8") + + +def try_create_zip_for_territory( + xml_info: dict, + database: DatabaseInterface, + storage: StorageInterface, +): + """ + Try calling create_zip_for_territory or give exception """ - Creating .zip files for the year's territories + try: + create_zip_for_territory(xml_info, database, storage) + except Exception as e: + logger.error( + f"Erro ao tentar criar ZIP de {xml_info['year']} para {xml_info['territory_name']} - {xml_info['state_code']}" + ) + raise e + + +def create_zip_for_territory( + xml_info: dict, + database: DatabaseInterface, + storage: StorageInterface, +): + """ + Creating .zip files for the year's territories """ - logger.info(f"Gerando ZIP do municipio {xml_file['territory_name']}-{xml_file['territory_id']} no ano {xml_file['year']}") + logger.info( + f"Gerando ZIP do municipio {xml_info['territory_name']}-{xml_info['territory_id']} no ano {xml_info['year']}" + ) - zip_buffer = BytesIO() + with NamedTemporaryFile() as zip_tempfile: + with ZipFile(zip_tempfile, "w", ZIP_DEFLATED) as zip_file: + file_name = generate_xml_name(xml_info) + write_file_to_zip(xml_info["xml_file_path"], file_name, zip_file) - with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: - territory_slug = get_territory_slug(xml_file['territory_name'], xml_file['state_code']) - zip_file.writestr(f"{territory_slug}_{xml_file['territory_id']}_{xml_file['year']}.xml", xml_file['xml']) - - zip_size = round(zip_buffer.tell() / (1024 * 1024), 2) - zip_buffer.seek(0) + zip_size = round(zip_tempfile.tell() / (1024 * 1024), 2) - try: - storage.upload_content(zip_path, zip_buffer) - except ClientError as e: - logger.error(f"Não foi possível fazer o upload do zip do município {xml_file['territory_id']}:\n{traceback.format_exc()}") - - zip_buffer.close() + territory_slug = get_territory_slug( + xml_info["territory_name"], xml_info["state_code"] + ) + storage_path = f"aggregates/{xml_info['state_code']}/{territory_slug}_{xml_info['territory_id']}_{xml_info['year']}.zip" + try: + storage.upload_file_multipart(storage_path, zip_tempfile.name) + except Exception: + logger.error( + f"Não foi possível fazer o upload do zip do município {xml_info['territory_id']}:\n{traceback.format_exc()}" + ) dict_query_info = { - "state_code" : xml_file['state_code'], - "territory_id" : xml_file['territory_id'], - "file_path": zip_path, - "year": xml_file['year'], - "hash_info": hx, + "state_code": xml_info["state_code"], + "territory_id": xml_info["territory_id"], + "file_path": storage_path, + "year": xml_info["year"], + "hash_info": xml_info["hash_info"], "file_size_mb": zip_size, - "last_updated": datetime.utcnow() + br_timezone, + "last_updated": datetime.now(br_timezone), } - database.insert("INSERT INTO aggregates \ + database.insert( + "INSERT INTO aggregates \ (territory_id, state_code, year, file_path, \ file_size_mb, hash_info, last_updated) \ VALUES (%(territory_id)s, %(state_code)s, \ @@ -190,67 +266,114 @@ def create_zip_for_territory(hx:str, zip_path:str, xml_file:dict, database:Datab ON CONFLICT(file_path) \ DO UPDATE \ SET state_code=EXCLUDED.state_code, last_updated=EXCLUDED.last_updated, \ - hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) - - zip_buffer.close() - - -def xml_content_generate(state:str, year:str, territory:dict, list_gazzetes_content:list, storage:StorageInterface): + hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", + dict_query_info, + ) + + +def try_create_zip_for_state( + state_xmls: list, + state_code: str, + year: int, + database: DatabaseInterface, + storage: StorageInterface, +): + """ + Try calling create_zip_for_state or give exception """ - Generates xml file with gazzetes content + try: + create_zip_for_state(state_xmls, state_code, year, database, storage) + except Exception as e: + logger.error(f"Erro ao tentar criar ZIP de {year} para {state_code}") + raise e + + +def create_zip_for_state( + state_xmls: list, + state_code: str, + year: int, + database: DatabaseInterface, + storage: StorageInterface, +): + """ + Creating .zip files for the state with all its territories """ - - root = ET.Element("root") - xml_file = BytesIO() - logger.info(f"Gerando XML para cidade {territory['name']}-{state} no ano {year}") - - meta_info_tag = ET.SubElement(root, "meta") - ET.SubElement(meta_info_tag, "uf").text = state - ET.SubElement(meta_info_tag, "ano_publicacao").text = str(year) - ET.SubElement(meta_info_tag, "municipio").text = territory['name'] - ET.SubElement(meta_info_tag, "municipio_codigo_ibge").text = territory['id'] - all_gazettes_tag = ET.SubElement(root, "diarios") + logger.info(f"Gerando ZIP do estado {state_code} no ano {year}") - for gazette in list_gazzetes_content: - file_gazette_txt = BytesIO() - path_arq_bucket = Path(gazette['file_path']).with_suffix(".txt") - - try: - storage.get_file(path_arq_bucket, file_gazette_txt) - except ClientError as e: - logger.warning(f"Erro na obtenção do conteúdo de texto do diário do territorio {path_arq_bucket}: {e}") - file_gazette_txt.close() + with NamedTemporaryFile() as zip_tempfile: + with ZipFile(zip_tempfile, "w", ZIP_DEFLATED) as zip_file: + for xml_info in state_xmls: + file_name = generate_xml_name(xml_info) + write_file_to_zip(xml_info["xml_file_path"], file_name, zip_file) - continue + zip_size = round(zip_tempfile.tell() / (1024 * 1024), 2) + hash_info = hash_file(zip_tempfile) - gazette_tag = ET.SubElement(all_gazettes_tag, "diario") - meta_gazette = ET.SubElement(gazette_tag, "meta_diario") - ET.SubElement(meta_gazette, "url_arquivo_original").text = gazette['file_url'] - ET.SubElement(meta_gazette, "poder").text = gazette['power'] - ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette['is_extra_edition'] else 'Não' - ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette['edition_number']) if str(gazette['edition_number']) is not None else "Não há" - ET.SubElement(meta_gazette, "data_publicacao").text = datetime.strftime((datetime.strptime(gazette['date'], "%Y-%m-%d").date()), "%d/%m") - ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') - - file_gazette_txt.close() - - # Format XML file - xml_str = ET.tostring(root, encoding='unicode') - format_xml = minidom.parseString(xml_str).toprettyxml(indent=" ") - xml_bytes = format_xml.encode('utf-8') - - xml_file.write(xml_bytes) - xml_file.seek(0) - - data = { - "xml":xml_file.getvalue(), - "territory_id":territory['id'], - "territory_name":territory['name'], - "state_code":state, - "year":year, + zip_path = f"aggregates/{state_code}/{state_code}_{year}.zip" + storage.upload_file_multipart(zip_path, zip_tempfile.name) + + dict_query_info = { + "state_code": state_code, + "territory_id": None, + "file_path": zip_path, + "year": year, + "hash_info": hash_info, + "file_size_mb": zip_size, + "last_updated": datetime.now(br_timezone), } - xml_file.close() + database.insert( + "INSERT INTO aggregates \ + (territory_id, state_code, year, file_path, \ + file_size_mb, hash_info, last_updated) \ + VALUES (%(territory_id)s, %(state_code)s, \ + %(year)s, %(file_path)s, %(file_size_mb)s, \ + %(hash_info)s, %(last_updated)s) \ + ON CONFLICT(file_path) \ + DO UPDATE \ + SET state_code = EXCLUDED.state_code, last_updated=EXCLUDED.last_updated, \ + hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", + dict_query_info, + ) + + +def zip_needs_upsert(xml_info: dict, database: DatabaseInterface): + """ + Verifies if zip need an upsert to the database (update or insert) + """ - return data \ No newline at end of file + identical_xml_in_database = list( + database.select( + f""" + SELECT + hash_info + FROM + aggregates + WHERE + hash_info='{xml_info["hash_info"]}' + and territory_id='{xml_info["territory_id"]}' + and year='{xml_info["year"]}' + """ + ) + ) + return len(identical_xml_in_database) == 0 + + +def generate_xml_name(xml_info): + territory_slug = get_territory_slug( + xml_info["territory_name"], xml_info["state_code"] + ) + return f"{territory_slug}_{xml_info['territory_id']}_{xml_info['year']}.xml" + + +def write_file_to_zip(file_path, name_in_zip, zip_file): + with open(file_path, "rb") as open_xml_file, zip_file.open( + name_in_zip, "w" + ) as xml_in_zip: + chunk_size = 5 * 1024 * 1024 + while True: + chunk = open_xml_file.read(chunk_size) + if not chunk: + break + xml_in_zip.write(chunk) diff --git a/tasks/interfaces.py b/tasks/interfaces.py deleted file mode 100644 index 444e59b..0000000 --- a/tasks/interfaces.py +++ /dev/null @@ -1,119 +0,0 @@ -from typing import Dict, Iterable, Tuple, Union -from pathlib import Path -import abc -from io import BytesIO - - -class DatabaseInterface(abc.ABC): - """ - Interface to abstract the iteraction with the database storing data used by the - tasks - """ - - @abc.abstractmethod - def _commit_changes(self, command: str, data: Dict) -> None: - """ - Make a change in the database and commit it - """ - - @abc.abstractmethod - def select(self, command: str) -> Iterable[Tuple]: - """ - Select entries from the database - """ - - @abc.abstractmethod - def insert(self, command: str, data: Dict) -> None: - """ - Insert entries into the database - """ - - @abc.abstractmethod - def update(self, command: str, data: Dict) -> None: - """ - Update entries from the database - """ - - @abc.abstractmethod - def delete(self, command: str, data: Dict) -> None: - """ - Delete entries from the database - """ - - -class StorageInterface(abc.ABC): - """ - Interface to abstract the interaction with the object store system. - """ - - @abc.abstractmethod - def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None: - """ - Download the given file key in the destination on the host - """ - - @abc.abstractmethod - def upload_content(self, file_key: str, content_to_be_uploaded: Union[str, BytesIO]) -> None: - """ - Upload the given content to the destination on the host - """ - - @abc.abstractmethod - def copy_file(self, source_file_key: str, destination_file_key: str) -> None: - """ - Copy the given source file to the destination place on the host - """ - - @abc.abstractmethod - def delete_file(self, file_key: str) -> None: - """ - Delete a file on the host. - """ - - -class IndexInterface(abc.ABC): - """ - Interface to abstract the interaction with the index system - """ - - @abc.abstractmethod - def create_index(self, index_name: str, body: Dict) -> None: - """ - Create the index used by the application - """ - - @abc.abstractmethod - def refresh_index(self, index_name: str) -> None: - """ - Refreshes the index to make it up-to-date for future searches - """ - - @abc.abstractmethod - def index_document( - self, document: Dict, document_id: str, index: str, refresh: bool - ) -> None: - """ - Upload document to the index - """ - - @abc.abstractmethod - def search(self, query: Dict, index: str) -> Dict: - """ - Searches the index with the provided query - """ - - @abc.abstractmethod - def paginated_search( - self, query: Dict, index: str, keep_alive: str - ) -> Iterable[Dict]: - """ - Searches the index with the provided query, with pagination - """ - - -class TextExtractorInterface(abc.ABC): - @abc.abstractmethod - def extract_text(self, filepath: str) -> str: - """ - Extract the text from the given file - """ diff --git a/tasks/list_gazettes_to_be_processed.py b/tasks/list_gazettes_to_be_processed.py index d6812cc..0dd0984 100644 --- a/tasks/list_gazettes_to_be_processed.py +++ b/tasks/list_gazettes_to_be_processed.py @@ -1,7 +1,7 @@ import logging from typing import Dict, Iterable -from .interfaces import DatabaseInterface +from database import DatabaseInterface def get_gazettes_to_be_processed( diff --git a/tasks/list_territories.py b/tasks/list_territories.py index ab7d663..55f4c32 100644 --- a/tasks/list_territories.py +++ b/tasks/list_territories.py @@ -1,6 +1,7 @@ from functools import lru_cache from typing import Dict, Iterable -from .interfaces import DatabaseInterface + +from database import DatabaseInterface @lru_cache diff --git a/tasks/utils/__init__.py b/tasks/utils/__init__.py index aa40fac..882450b 100644 --- a/tasks/utils/__init__.py +++ b/tasks/utils/__init__.py @@ -1,3 +1,4 @@ +from .datetime import br_timezone from .index import ( get_documents_from_query_with_highlights, get_documents_with_ids, @@ -15,7 +16,5 @@ ) from .hash import ( hash_content, -) -from .need_upsert import ( - zip_needs_upsert, + hash_file, ) diff --git a/tasks/utils/datetime.py b/tasks/utils/datetime.py new file mode 100644 index 0000000..e0028c9 --- /dev/null +++ b/tasks/utils/datetime.py @@ -0,0 +1,4 @@ +from datetime import timedelta, timezone + + +br_timezone = timezone(timedelta(hours=-3)) diff --git a/tasks/utils/hash.py b/tasks/utils/hash.py index f85536b..9e0a139 100644 --- a/tasks/utils/hash.py +++ b/tasks/utils/hash.py @@ -1,11 +1,34 @@ -import hashlib, os +import hashlib def hash_content(content: bytes) -> str: """ - Receives a content of byte format and returns its SHA-256 hash + Receives a content of byte format and returns its md5 hash """ - result_hash = hashlib.sha256(content).hexdigest() + result_hash = hashlib.md5(content).hexdigest() - return result_hash \ No newline at end of file + return result_hash + + +def hash_file(file) -> str: + """ + Generate file md5 hash without hurting memory + """ + hash = hashlib.md5() + chunk_size = 128 * hash.block_size + + if isinstance(file, str): + with open(file, 'rb') as f: + _chunk_hashing(hash, chunk_size, f) + else: + file.seek(0) + _chunk_hashing(hash, chunk_size, file) + file.seek(0) + + return hash.hexdigest() + + +def _chunk_hashing(hash, chunk_size, file): + for chunk in iter(lambda: file.read(chunk_size), b''): + hash.update(chunk) diff --git a/tasks/utils/index.py b/tasks/utils/index.py index 83d769c..fb5639b 100644 --- a/tasks/utils/index.py +++ b/tasks/utils/index.py @@ -1,6 +1,6 @@ from typing import Dict, Iterable, List -from ..interfaces import IndexInterface +from index import IndexInterface def get_documents_with_ids( diff --git a/tasks/utils/need_upsert.py b/tasks/utils/need_upsert.py deleted file mode 100644 index 5cc6a6b..0000000 --- a/tasks/utils/need_upsert.py +++ /dev/null @@ -1,18 +0,0 @@ -from typing import Union -from ..interfaces import DatabaseInterface - - -def zip_needs_upsert(hx: Union[str, bytes], zip_path:str, database:DatabaseInterface): - """ - Verifies if zip need an upsert to the database (update or insert) - """ - - needs_update_or_inexists = True - - query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ - WHERE file_path='{zip_path}';")) - - if query_existing_aggregate: - needs_update_or_inexists = hx != query_existing_aggregate[0][0] - - return needs_update_or_inexists \ No newline at end of file diff --git a/tests/digital_ocean_spaces.py b/tests/digital_ocean_spaces.py index 1c0bf30..31ad4f4 100644 --- a/tests/digital_ocean_spaces.py +++ b/tests/digital_ocean_spaces.py @@ -1,15 +1,14 @@ -from unittest import TestCase, expectedFailure -from unittest.mock import patch, sentinel import datetime import hashlib import tempfile +from io import BytesIO +from unittest import TestCase, expectedFailure +from unittest.mock import patch, sentinel -from botocore.stub import Stubber import boto3 -from io import BytesIO +from botocore.stub import Stubber -from storage import DigitalOceanSpaces, create_storage_interface -from tasks import StorageInterface +from storage import DigitalOceanSpaces, StorageInterface, create_storage_interface @patch.dict( diff --git a/tests/opensearch.py b/tests/opensearch.py index f1bb112..2d2e3c9 100644 --- a/tests/opensearch.py +++ b/tests/opensearch.py @@ -1,12 +1,11 @@ +import uuid from datetime import date, datetime from unittest import TestCase, expectedFailure -from unittest.mock import patch, MagicMock -import uuid +from unittest.mock import MagicMock, patch -import opensearch +import opensearchpy -from index.opensearch import OpenSearchInterface, create_index_interface -from tasks import IndexInterface +from index import IndexInterface, OpenSearchInterface, create_index_interface class IndexInterfaceFactoryFunctionTests(TestCase): @@ -92,12 +91,12 @@ def setUp(self): def test_opensearch_should_implement_index_interface(self): self.assertIsInstance(OpenSearchInterface([]), IndexInterface) - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_connection(self, opensearch_mock): interface = OpenSearchInterface(["127.0.0.1"]) opensearch_mock.assert_called_once_with(hosts=["127.0.0.1"]) - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_index_creation_should_check_if_index_exists( self, opensearch_mock ): @@ -107,7 +106,7 @@ def test_opensearch_index_creation_should_check_if_index_exists( interface.create_index("querido-diario") interface.search_engine.indices.exists.assert_called_once_with(index="querido-diario") - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_index_creation_should_failed_when_no_index_is_provided( self, opensearch_mock ): @@ -117,7 +116,7 @@ def test_opensearch_index_creation_should_failed_when_no_index_is_provided( with self.assertRaisesRegex(Exception, "Index name not defined"): interface.create_index() - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_index_creation_with_default_index_value( self, opensearch_mock ): @@ -129,7 +128,7 @@ def test_opensearch_index_creation_with_default_index_value( interface.create_index() interface.search_engine.indices.exists.assert_called_once_with(index="querido-diario2") - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_index_default_timeout_should_be_30s( self, opensearch_mock ): @@ -144,7 +143,7 @@ def test_opensearch_index_default_timeout_should_be_30s( timeout="30s", ) - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_index_should_allow_change_default_timeout( self, opensearch_mock ): @@ -159,7 +158,7 @@ def test_opensearch_index_should_allow_change_default_timeout( timeout="2m", ) - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_index_creation_should_not_recreate_index_if_it_exists( self, opensearch_mock ): @@ -171,7 +170,7 @@ def test_opensearch_index_creation_should_not_recreate_index_if_it_exists( interface.search_engine.indices.exists.assert_called_once_with(index="querido-diario") interface.search_engine.indices.create.assert_not_called() - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_should_create_index_if_it_does_not_exists( self, opensearch_mock ): @@ -187,7 +186,7 @@ def test_opensearch_should_create_index_if_it_does_not_exists( timeout="30s", ) - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_opensearch_should_create_index_with_default_value_with_function_has_no_arguments( self, opensearch_mock ): @@ -205,7 +204,7 @@ def test_opensearch_should_create_index_with_default_value_with_function_has_no_ timeout="30s", ) - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_upload_document_to_index(self, opensearch_mock): interface = OpenSearchInterface(["127.0.0.1"]) document_checksum = str(uuid.uuid1()) @@ -216,7 +215,7 @@ def test_upload_document_to_index(self, opensearch_mock): id=self.fake_document["file_checksum"], ) - @patch("opensearch.Opensearch", autospec=True) + @patch("opensearchpy.Opensearch", autospec=True) def test_upload_document_to_index_using_default_index(self, opensearch_mock): interface = OpenSearchInterface( ["127.0.0.1"], default_index="querido-diario2" @@ -249,7 +248,7 @@ def setUp(self): "state_code": "SC", "territory_name": "Gaspar", } - self.search_engine = opensearch.Opensearch(hosts=["127.0.0.1"]) + self.search_engine = opensearchpy.Opensearch(hosts=["127.0.0.1"]) def clean_index(self, index): self.search_engine.delete_by_query( diff --git a/tests/postgresql.py b/tests/postgresql.py index e19ad79..6677173 100644 --- a/tests/postgresql.py +++ b/tests/postgresql.py @@ -6,8 +6,7 @@ import psycopg2 -from database import PostgreSQL, create_database_interface -from tasks import DatabaseInterface +from database import DatabaseInterface, PostgreSQL, create_database_interface def get_database_name(): diff --git a/tests/text_extraction_task_tests.py b/tests/text_extraction_task_tests.py index 5031667..5c1a3fe 100644 --- a/tests/text_extraction_task_tests.py +++ b/tests/text_extraction_task_tests.py @@ -5,10 +5,10 @@ from datetime import date, datetime import tempfile +from data_extraction import TextExtractorInterface from tasks import ( extract_text_pending_gazettes, upload_gazette_raw_text, - TextExtractorInterface, ) diff --git a/tests/text_extraction_tests.py b/tests/text_extraction_tests.py index ec9a4a6..2091650 100644 --- a/tests/text_extraction_tests.py +++ b/tests/text_extraction_tests.py @@ -2,8 +2,7 @@ from unittest.mock import patch, mock_open, MagicMock import os -from data_extraction import ApacheTikaTextExtractor, create_apache_tika_text_extraction -from tasks import TextExtractorInterface +from data_extraction import ApacheTikaTextExtractor, TextExtractorInterface class ApacheTikaTextExtractorTest(TestCase):