From 76ec2d84ea56898b1367881fe0c0f6c7554831ae Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Mon, 20 May 2024 17:05:01 -0300 Subject: [PATCH 01/15] =?UTF-8?q?Cria=20script=20e=20comando=20para=20agre?= =?UTF-8?q?ga=C3=A7=C3=A3o=20dos=20arquivos=20.txt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 7 +++++++ tasks/gazette_txt_to_xml.py | 14 ++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 tasks/gazette_txt_to_xml.py diff --git a/Makefile b/Makefile index 7933d28..7c7ca17 100644 --- a/Makefile +++ b/Makefile @@ -265,3 +265,10 @@ wait-opensearch: publish-tag: podman tag $(IMAGE_NAMESPACE)/$(IMAGE_NAME):${IMAGE_TAG} $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags) podman push $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags) + +.PHONY: aggregate-gazettes +# aggregate-gazettes: set-run-variable-values +# podman exec -it $(STORAGE_CONTAINER_NAME) \ +# ls +aggregate-gazettes: + python3 tasks/gazette_txt_to_xml.py \ No newline at end of file diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py new file mode 100644 index 0000000..c781620 --- /dev/null +++ b/tasks/gazette_txt_to_xml.py @@ -0,0 +1,14 @@ +# from .interfaces import DatabaseInterface, StorageInterface +# from create_database_interface, create_storage_interfac + +def organize_files_by_city_and_date(): + """ + Organize the files in the S3 bucket by city and date + """ + + print("TESTE - Script que agrega os arquivos .txt para .xml") + + pass + +if __name__ == "__main__": + organize_files_by_city_and_date() \ No newline at end of file From d151c8ef085e266a5b4e0827beeddc308120eb3e Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Tue, 21 May 2024 11:41:05 -0300 Subject: [PATCH 02/15] =?UTF-8?q?Modifica=20comando=20aggregate-gazettes?= =?UTF-8?q?=20para=20usar=20o=20container=20que=20=C3=A9=20criado=20para?= =?UTF-8?q?=20executar=20o=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: ArthurFerreiraRodrigues --- Makefile | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 7c7ca17..2b84a35 100644 --- a/Makefile +++ b/Makefile @@ -267,8 +267,9 @@ publish-tag: podman push $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags) .PHONY: aggregate-gazettes -# aggregate-gazettes: set-run-variable-values -# podman exec -it $(STORAGE_CONTAINER_NAME) \ -# ls -aggregate-gazettes: - python3 tasks/gazette_txt_to_xml.py \ No newline at end of file +aggregate-gazettes: set-run-variable-values + podman run -ti --volume $(CURDIR):/mnt/code:rw \ + --pod $(POD_NAME) \ + --env PYTHONPATH=/mnt/code \ + --env-file envvars \ + $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(IMAGE_TAG) python tasks/gazette_txt_to_xml.py \ No newline at end of file From ccee5b36cc5878ab95c48ca159632c53aafc8464 Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Fri, 24 May 2024 16:20:43 -0300 Subject: [PATCH 03/15] =?UTF-8?q?Adiciona=20inst=C3=A2ncias=20do=20banco?= =?UTF-8?q?=20de=20dados=20e=20do=20bucket=20com=20utiliza=C3=A7=C3=A3o=20?= =?UTF-8?q?de=20algumas=20fun=C3=A7=C3=B5es?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- storage/digital_ocean_spaces.py | 12 ++++++++++++ tasks/gazette_txt_to_xml.py | 18 +++++++++++++++++- tasks/interfaces.py | 12 ++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/storage/digital_ocean_spaces.py b/storage/digital_ocean_spaces.py index 3e18975..bfcdd99 100644 --- a/storage/digital_ocean_spaces.py +++ b/storage/digital_ocean_spaces.py @@ -83,3 +83,15 @@ def upload_content( self._client.upload_fileobj( f, self._bucket, file_key, ExtraArgs={"ACL": permission} ) + + def copy_file(self, source_file_key: str, destination_file_key: str) -> None: + logging.debug(f"Copying {source_file_key} to {destination_file_key}") + self._client.copy_object( + Bucket=self._bucket, + CopySource={'Bucket': self._bucket, 'Key': source_file_key}, + Key=destination_file_key + ) + + def delete_file(self, file_key: str) -> None: + logging.debug(f"Deletando {file_key}") + self._client.delete_object(Bucket=self._bucket, Key=file_key) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index c781620..d59a40d 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -1,13 +1,29 @@ # from .interfaces import DatabaseInterface, StorageInterface -# from create_database_interface, create_storage_interfac +from database import create_database_interface +from storage import create_storage_interface + def organize_files_by_city_and_date(): """ Organize the files in the S3 bucket by city and date """ + database = create_database_interface() + storage = create_storage_interface() print("TESTE - Script que agrega os arquivos .txt para .xml") + # Imprime cada resultado da query do banco de dados + for g in database.select("SELECT * FROM gazettes LIMIT 10;"): # Precisa do ponto e vírgula no final + print(g) # É a posição 7 que conetm o caminho do arquivo dentro do S3 + print("\n") + + # Deleta um arquivo do S3, funciona + storage.delete_file("2933604/2024") + + # Faz a copia de um arquivo de um lugar para outro, funciona mas levanta uma exceção + storage.copy_file("2933604/2024-03-01/efbc33795f0b37296a01bad5187eb8a326dbe66b.txt", "2933604/2024/") + + pass if __name__ == "__main__": diff --git a/tasks/interfaces.py b/tasks/interfaces.py index 06b81cb..e60327b 100644 --- a/tasks/interfaces.py +++ b/tasks/interfaces.py @@ -56,6 +56,18 @@ def upload_content(self, file_key: str, content_to_be_uploaded: str) -> None: Upload the given content to the destination on the host """ + @abc.abstractmethod + def copy_file(self, source_file_key: str, destination_file_key: str) -> None: + """ + Copy the given source file to the destination place on the host + """ + + @abc.abstractmethod + def delete_file(self, file_key: str) -> None: + """ + Delete a file in the bucket S3. + """ + class IndexInterface(abc.ABC): """ From aa859467261d6b5aa237def820098e329acc69a8 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Wed, 5 Jun 2024 12:18:02 -0300 Subject: [PATCH 04/15] =?UTF-8?q?Cria=C3=A7=C3=A3o=20das=20primeiras=20fun?= =?UTF-8?q?=C3=A7=C3=B5es=20para=20extra=C3=A7=C3=A3o=20de=20dados=20e=20c?= =?UTF-8?q?ria=C3=A7=C3=A3o=20de=20arquivo=20xml?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: csafurtado --- tasks/gazette_txt_to_xml.py | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index d59a40d..636d540 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -1,8 +1,8 @@ # from .interfaces import DatabaseInterface, StorageInterface +from io import BytesIO from database import create_database_interface from storage import create_storage_interface - def organize_files_by_city_and_date(): """ Organize the files in the S3 bucket by city and date @@ -13,16 +13,34 @@ def organize_files_by_city_and_date(): print("TESTE - Script que agrega os arquivos .txt para .xml") # Imprime cada resultado da query do banco de dados - for g in database.select("SELECT * FROM gazettes LIMIT 10;"): # Precisa do ponto e vírgula no final - print(g) # É a posição 7 que conetm o caminho do arquivo dentro do S3 - print("\n") + for g in database.select("SELECT * FROM gazettes WHERE date BETWEEN '2024-04-20' AND '2024-05-01' LIMIT 10;"): # Precisa do ponto e vírgula no final + arquivo = BytesIO() + path_arq_bucket = str(g[7]).replace(".pdf",".txt") + + storage.get_file(path_arq_bucket, arquivo) # É a posição 7 que conetm o caminho do arquivo dentro do S3 + + # print(arquivo.getvalue().decode('utf-8')) + + print("\n---------------------------------------------\n") + + # Faz a copia do arquivo txt para pasta no S3 + path_bucket_separado = path_arq_bucket.split("/") + path_bucket_separado[1] = str(g[2].year) + + path_novo_bucket = "/".join(path_bucket_separado) + print(path_novo_bucket) + storage.copy_file(path_arq_bucket, path_novo_bucket) + # Deleta um arquivo do S3, funciona - storage.delete_file("2933604/2024") + # storage.delete_file("1718808/2024/460131dd7666d09566983792e85ef8f23bbe024c.txt") + + # # Faz a copia de um arquivo de um lugar para outro, funciona mas levanta uma exceção + # storage.copy_file("1718808/2024-04-01/460131dd7666d09566983792e85ef8f23bbe024c.txt", "1718808/2024/460131dd7666d09566983792e85ef8f23bbe024c.txt") + - # Faz a copia de um arquivo de um lugar para outro, funciona mas levanta uma exceção - storage.copy_file("2933604/2024-03-01/efbc33795f0b37296a01bad5187eb8a326dbe66b.txt", "2933604/2024/") + # print(arquivo.getvalue().decode('utf-8')) pass From 187fcf849a1781d51aa177bcf67fe82fe5686964 Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Thu, 13 Jun 2024 11:00:36 -0300 Subject: [PATCH 05/15] =?UTF-8?q?Permite=20criar=20xml=20com=20informa?= =?UTF-8?q?=C3=A7=C3=B5es=20e=20mandar=20para=20o=20bucket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: wildemberg-sales --- tasks/gazette_txt_to_xml.py | 102 +++++++++++++++++++++++++++++------- 1 file changed, 82 insertions(+), 20 deletions(-) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 636d540..0fdac53 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -2,6 +2,56 @@ from io import BytesIO from database import create_database_interface from storage import create_storage_interface +import xml.etree.cElementTree as ET +import hashlib + + +def hash_text(text): + """ + Receives a text and returns its SHA-256 hash of a text content + """ + # Cria um objeto sha256 + hasher = hashlib.sha256() + + # Atualiza o objeto com o texto codificado em UTF-8 + hasher.update(text.encode('utf-8')) + + # Obtém o hash hexadecimal + return hasher.hexdigest() + +def txt_to_xml(path_xml, txt: str, meta_info: dict, storage): + """ + Transform a .txt file into a .xml file and upload it to the storage bucket + """ + # Cria uma tag (elemento) chamado 'root' e um subelemento deste chamado 'doc' + root = ET.Element("root") + meta_info_tag = ET.SubElement(root, "meta") + + # Cria um subelemento do 'doc' chamado 'field1' e 'field2' com atributos 'name' e um texto + ET.SubElement(meta_info_tag, "data", name="dia").text = meta_info['dia'] + ET.SubElement(meta_info_tag, "data", name="mes").text = meta_info['mes'] + + ET.SubElement(meta_info_tag, "localidade", name="municipio").text = "some vlaue2" + ET.SubElement(meta_info_tag, "localidade", name="estado").text = "estado" + ET.SubElement(meta_info_tag, "criado_em").text = "criado_em" + + gazettes_tag = ET.SubElement(root, "gazettes") + + ET.SubElement(gazettes_tag, "gazette").text = txt + + # Adiciona a uma árvore de elementos XML (ou seja, o elemento 'root' onde contém todo o documento) + # e o adiciona a um arquivo binário que será enviado para o storage bucket em formato .xml + tree = ET.ElementTree(root) + + file_xml = BytesIO() + + tree.write(file_xml, encoding='utf-8', xml_declaration=True) + file_xml.seek(0) # Volta o cursor de leitura do arquivo para o começo dele + + content_file_xml = file_xml.getvalue().decode('utf-8') + + storage.upload_content(path_xml, content_file_xml) + def organize_files_by_city_and_date(): """ @@ -12,37 +62,49 @@ def organize_files_by_city_and_date(): print("TESTE - Script que agrega os arquivos .txt para .xml") + results_query = database.select("SELECT g.*, t.name, t.state_code FROM gazettes AS g\ + JOIN territories AS t ON g.territory_id = t.id;") + # Imprime cada resultado da query do banco de dados - for g in database.select("SELECT * FROM gazettes WHERE date BETWEEN '2024-04-20' AND '2024-05-01' LIMIT 10;"): # Precisa do ponto e vírgula no final - arquivo = BytesIO() - path_arq_bucket = str(g[7]).replace(".pdf",".txt") - - storage.get_file(path_arq_bucket, arquivo) # É a posição 7 que conetm o caminho do arquivo dentro do S3 + for resultado in results_query: # Precisa do ponto e vírgula no final + try: + arquivo = BytesIO() + path_arq_bucket = str(resultado[7]).replace(".pdf",".txt") # É a posição 7 que contem o caminho do arquivo dentro do S3 + + storage.get_file(path_arq_bucket, arquivo) # Pega o conteúdo do objeto do arquivo do S3 e coloca no BytesIO - # print(arquivo.getvalue().decode('utf-8')) - - print("\n---------------------------------------------\n") + dict_gazzete_info = { + "dia": str(resultado[2].day), + "mes": str(resultado[2].month), + "ano": str(resultado[2].year), + "municipio": resultado[-2], + "estado": resultado[-1], + } + + # print(arquivo.getvalue().decode('utf-8')) # Imprime o conteúdo do arquivo com codificação utf-8 - # Faz a copia do arquivo txt para pasta no S3 - path_bucket_separado = path_arq_bucket.split("/") - path_bucket_separado[1] = str(g[2].year) + print("\n---------------------------------------------\n") - path_novo_bucket = "/".join(path_bucket_separado) - print(path_novo_bucket) - storage.copy_file(path_arq_bucket, path_novo_bucket) + # Faz a copia do arquivo txt para pasta no S3 + path_bucket_separado = path_arq_bucket.split("/") + path_bucket_separado[1] = str(resultado[2].year) + path_novo_bucket = "/".join(path_bucket_separado) + print(path_novo_bucket) - # Deleta um arquivo do S3, funciona - # storage.delete_file("1718808/2024/460131dd7666d09566983792e85ef8f23bbe024c.txt") + storage.copy_file(path_arq_bucket, path_novo_bucket) - # # Faz a copia de um arquivo de um lugar para outro, funciona mas levanta uma exceção - # storage.copy_file("1718808/2024-04-01/460131dd7666d09566983792e85ef8f23bbe024c.txt", "1718808/2024/460131dd7666d09566983792e85ef8f23bbe024c.txt") + arquivo.close() + + except: + continue + path_xml = "/".join(path_novo_bucket.split("/")[:-1]) + f"/{dict_gazzete_info['municipio']}-{dict_gazzete_info['estado']}.xml" + print(path_xml) - # print(arquivo.getvalue().decode('utf-8')) + txt_to_xml(path_xml, "teste", dict_gazzete_info, storage) - pass if __name__ == "__main__": organize_files_by_city_and_date() \ No newline at end of file From 3c55b9c26cc0b662b25fb4d4a3e5bc9460221532 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Thu, 13 Jun 2024 13:46:10 -0300 Subject: [PATCH 06/15] =?UTF-8?q?implementa=20cria=C3=A7=C3=A3o=20xml=20pa?= =?UTF-8?q?ra=20territorio=20e=20ano?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: csafurtado Formata tags e nome de arquivos para conformidade --- tasks/gazette_txt_to_xml.py | 137 ++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 70 deletions(-) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 0fdac53..99ed89e 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -3,13 +3,15 @@ from database import create_database_interface from storage import create_storage_interface import xml.etree.cElementTree as ET -import hashlib +import hashlib, traceback +from datetime import datetime def hash_text(text): """ Receives a text and returns its SHA-256 hash of a text content """ + # Cria um objeto sha256 hasher = hashlib.sha256() @@ -17,94 +19,89 @@ def hash_text(text): hasher.update(text.encode('utf-8')) # Obtém o hash hexadecimal - return hasher.hexdigest() + return hasher.hexdigest() -def txt_to_xml(path_xml, txt: str, meta_info: dict, storage): +def create_xml_for_territory_and_year(territory_info:tuple, database, storage): """ - Transform a .txt file into a .xml file and upload it to the storage bucket + Create a .xml files for each year of gazettes for a territory """ - # Cria uma tag (elemento) chamado 'root' e um subelemento deste chamado 'doc' - root = ET.Element("root") - meta_info_tag = ET.SubElement(root, "meta") - # Cria um subelemento do 'doc' chamado 'field1' e 'field2' com atributos 'name' e um texto - ET.SubElement(meta_info_tag, "data", name="dia").text = meta_info['dia'] - ET.SubElement(meta_info_tag, "data", name="mes").text = meta_info['mes'] + actual_year = datetime.now().year + base_year = 1960 + + for year in range(base_year, actual_year+1): + query_content = list(database.select(f"SELECT * FROM gazettes\ + WHERE territory_id='{territory_info[0]}' AND\ + date BETWEEN '{year}-01-01' AND '{year}-12-31'\ + ORDER BY date ASC;")) + + if len(query_content) > 0: + print(f"Gerando XML para cidade {territory_info[1]}-{territory_info[2]} no ano {year}") + root = ET.Element("root") + meta_info_tag = ET.SubElement(root, "meta") + ET.SubElement(meta_info_tag, "localidade", name="municipio").text = territory_info[1] + ET.SubElement(meta_info_tag, "localidade", name="estado").text = territory_info[2] + ET.SubElement(meta_info_tag, "criado_em").text = str(datetime.now()) + ET.SubElement(meta_info_tag, "ano").text = str(year) + all_gazettes_tag = ET.SubElement(root, "diarios") + + path_xml = f"{territory_info[0]}/{year}/{territory_info[1]}-{territory_info[2]}-{year}.xml" + + for gazette in query_content: + try: + file_gazette_txt = BytesIO() + path_arq_bucket = str(gazette[7]).replace(".pdf",".txt") # É a posição 7 que contem o caminho do arquivo dentro do S3 + + storage.get_file(path_arq_bucket, file_gazette_txt) + + except: + print(f"Erro na obtenção do conteúdo de texto do diário de {territory_info[1]}-{territory_info[2]}-{gazette[2]}") + continue + + gazette_tag = ET.SubElement(all_gazettes_tag, "gazette") + meta_gazette = ET.SubElement(gazette_tag, "meta-gazette") + ET.SubElement(meta_gazette, "url_pdf").text = gazette[8] + ET.SubElement(meta_gazette, "poder").text = gazette[5] + ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette[4] else 'Não' + ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette[3]) if str(gazette[3]) is not None else "Não há" + ET.SubElement(meta_gazette, "data_diario").text = datetime.strftime(gazette[2], "%d/%m") + ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') + + file_gazette_txt.close() + + tree = ET.ElementTree(root) - ET.SubElement(meta_info_tag, "localidade", name="municipio").text = "some vlaue2" - ET.SubElement(meta_info_tag, "localidade", name="estado").text = "estado" - ET.SubElement(meta_info_tag, "criado_em").text = "criado_em" + file_xml = BytesIO() - gazettes_tag = ET.SubElement(root, "gazettes") - - ET.SubElement(gazettes_tag, "gazette").text = txt - - # Adiciona a uma árvore de elementos XML (ou seja, o elemento 'root' onde contém todo o documento) - # e o adiciona a um arquivo binário que será enviado para o storage bucket em formato .xml - tree = ET.ElementTree(root) + tree.write(file_xml, encoding='utf-8', xml_declaration=True) + file_xml.seek(0) # Volta o cursor de leitura do arquivo para o começo dele - file_xml = BytesIO() + content_file_xml = file_xml.getvalue().decode('utf-8') - tree.write(file_xml, encoding='utf-8', xml_declaration=True) - file_xml.seek(0) # Volta o cursor de leitura do arquivo para o começo dele + storage.upload_content(path_xml, content_file_xml) - content_file_xml = file_xml.getvalue().decode('utf-8') + file_xml.close() + else: + "Teste de saida" + # print(f"Nada encontrado para cidade {territory_info[1]}-{territory_info[2]} no ano {year}") - storage.upload_content(path_xml, content_file_xml) - +def create_xml_territories(): -def organize_files_by_city_and_date(): - """ - Organize the files in the S3 bucket by city and date - """ database = create_database_interface() storage = create_storage_interface() - print("TESTE - Script que agrega os arquivos .txt para .xml") + print("Script que agrega os arquivos .txt para .xml") - results_query = database.select("SELECT g.*, t.name, t.state_code FROM gazettes AS g\ - JOIN territories AS t ON g.territory_id = t.id;") + # results_query = database.select("SELECT * FROM territories WHERE name='Sampaio' OR name='Xique-Xique';") + results_query = database.select("SELECT * FROM territories;") - # Imprime cada resultado da query do banco de dados - for resultado in results_query: # Precisa do ponto e vírgula no final + for t in results_query: try: - arquivo = BytesIO() - path_arq_bucket = str(resultado[7]).replace(".pdf",".txt") # É a posição 7 que contem o caminho do arquivo dentro do S3 - - storage.get_file(path_arq_bucket, arquivo) # Pega o conteúdo do objeto do arquivo do S3 e coloca no BytesIO - - dict_gazzete_info = { - "dia": str(resultado[2].day), - "mes": str(resultado[2].month), - "ano": str(resultado[2].year), - "municipio": resultado[-2], - "estado": resultado[-1], - } - - # print(arquivo.getvalue().decode('utf-8')) # Imprime o conteúdo do arquivo com codificação utf-8 - - print("\n---------------------------------------------\n") - - # Faz a copia do arquivo txt para pasta no S3 - path_bucket_separado = path_arq_bucket.split("/") - path_bucket_separado[1] = str(resultado[2].year) - - path_novo_bucket = "/".join(path_bucket_separado) - print(path_novo_bucket) - - storage.copy_file(path_arq_bucket, path_novo_bucket) - - arquivo.close() - + create_xml_for_territory_and_year(t, database, storage) except: + print(traceback.format_exc()) continue - path_xml = "/".join(path_novo_bucket.split("/")[:-1]) + f"/{dict_gazzete_info['municipio']}-{dict_gazzete_info['estado']}.xml" - - print(path_xml) - - txt_to_xml(path_xml, "teste", dict_gazzete_info, storage) - if __name__ == "__main__": - organize_files_by_city_and_date() \ No newline at end of file + create_xml_territories() \ No newline at end of file From 7227dc0a79ef176db92501666090098ecea9d44c Mon Sep 17 00:00:00 2001 From: ArthurFerreiraRodrigues Date: Tue, 18 Jun 2024 15:35:23 -0300 Subject: [PATCH 07/15] Adiciona nome ao container e rotina de limpeza pre-start --- Makefile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 2b84a35..95bd25e 100644 --- a/Makefile +++ b/Makefile @@ -266,10 +266,16 @@ publish-tag: podman tag $(IMAGE_NAMESPACE)/$(IMAGE_NAME):${IMAGE_TAG} $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags) podman push $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags) +.PHONY: stop-aggregate-gazettes +stop-aggregate-gazettes: + podman stop --ignore agg-gazettes + podman rm --force --ignore agg-gazettes + .PHONY: aggregate-gazettes -aggregate-gazettes: set-run-variable-values +aggregate-gazettes: stop-aggregate-gazettes set-run-variable-values podman run -ti --volume $(CURDIR):/mnt/code:rw \ --pod $(POD_NAME) \ --env PYTHONPATH=/mnt/code \ --env-file envvars \ + --name agg-gazettes \ $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(IMAGE_TAG) python tasks/gazette_txt_to_xml.py \ No newline at end of file From a3d14414403598ae25ffb05939f91cedc4ef41e3 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Thu, 20 Jun 2024 16:03:04 -0300 Subject: [PATCH 08/15] =?UTF-8?q?Cria=C3=A7=C3=A3o=20de=20Arquivo=20zip=20?= =?UTF-8?q?do=20XML?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- storage/digital_ocean_spaces.py | 5 +-- tasks/gazette_txt_to_xml.py | 58 +++++++++++++++++++-------------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/storage/digital_ocean_spaces.py b/storage/digital_ocean_spaces.py index bfcdd99..768e7e0 100644 --- a/storage/digital_ocean_spaces.py +++ b/storage/digital_ocean_spaces.py @@ -75,15 +75,16 @@ def get_file(self, file_key: str, destination) -> None: def upload_content( self, file_key: str, - content_to_be_uploaded: str, + content_to_be_uploaded: bytes, permission: str = "public-read", ) -> None: logging.debug(f"Uploading {file_key}") - f = BytesIO(content_to_be_uploaded.encode()) + f = BytesIO(content_to_be_uploaded) self._client.upload_fileobj( f, self._bucket, file_key, ExtraArgs={"ACL": permission} ) + def copy_file(self, source_file_key: str, destination_file_key: str) -> None: logging.debug(f"Copying {source_file_key} to {destination_file_key}") self._client.copy_object( diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 99ed89e..0d9eb34 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -5,18 +5,18 @@ import xml.etree.cElementTree as ET import hashlib, traceback from datetime import datetime +from zipfile import ZipFile, ZIP_DEFLATED - -def hash_text(text): +def hash_text(zip): """ - Receives a text and returns its SHA-256 hash of a text content + Receives a zip and returns its SHA-256 hash of a zip content """ # Cria um objeto sha256 hasher = hashlib.sha256() - # Atualiza o objeto com o texto codificado em UTF-8 - hasher.update(text.encode('utf-8')) + # Atualiza o objeto com o zip codificado em UTF-8 + hasher.update(zip.encode('utf-8')) # Obtém o hash hexadecimal return hasher.hexdigest() @@ -45,7 +45,7 @@ def create_xml_for_territory_and_year(territory_info:tuple, database, storage): ET.SubElement(meta_info_tag, "ano").text = str(year) all_gazettes_tag = ET.SubElement(root, "diarios") - path_xml = f"{territory_info[0]}/{year}/{territory_info[1]}-{territory_info[2]}-{year}.xml" + path_xml = f"aggregates/{territory_info[0]}/{year}/{territory_info[1]}-{territory_info[2]}-{year}.xml" for gazette in query_content: try: @@ -54,35 +54,45 @@ def create_xml_for_territory_and_year(territory_info:tuple, database, storage): storage.get_file(path_arq_bucket, file_gazette_txt) + gazette_tag = ET.SubElement(all_gazettes_tag, "gazette") + meta_gazette = ET.SubElement(gazette_tag, "meta-gazette") + ET.SubElement(meta_gazette, "url_pdf").text = gazette[8] + ET.SubElement(meta_gazette, "poder").text = gazette[5] + ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette[4] else 'Não' + ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette[3]) if str(gazette[3]) is not None else "Não há" + ET.SubElement(meta_gazette, "data_diario").text = datetime.strftime(gazette[2], "%d/%m") + ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') + + file_gazette_txt.close() except: print(f"Erro na obtenção do conteúdo de texto do diário de {territory_info[1]}-{territory_info[2]}-{gazette[2]}") continue - - gazette_tag = ET.SubElement(all_gazettes_tag, "gazette") - meta_gazette = ET.SubElement(gazette_tag, "meta-gazette") - ET.SubElement(meta_gazette, "url_pdf").text = gazette[8] - ET.SubElement(meta_gazette, "poder").text = gazette[5] - ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette[4] else 'Não' - ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette[3]) if str(gazette[3]) is not None else "Não há" - ET.SubElement(meta_gazette, "data_diario").text = datetime.strftime(gazette[2], "%d/%m") - ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') - - file_gazette_txt.close() tree = ET.ElementTree(root) file_xml = BytesIO() - tree.write(file_xml, encoding='utf-8', xml_declaration=True) + tree.write(file_xml, encoding='utf8', xml_declaration=True) file_xml.seek(0) # Volta o cursor de leitura do arquivo para o começo dele - content_file_xml = file_xml.getvalue().decode('utf-8') - - storage.upload_content(path_xml, content_file_xml) - - file_xml.close() + try: + zip_buffer = BytesIO() + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: + zip_file.writestr(f"{territory_info[1]}-{territory_info[2]}-{year}.xml", file_xml.getvalue()) + zip_buffer.seek(0) # Volta o cursor de leitura do zip para o começo dele + + zip_path = f"aggregates/{territory_info[0]}/{year}.zip" + + storage.upload_content(zip_path, zip_buffer.getvalue()) + + zip_buffer.close() + file_xml.close() + except Exception as e: + print(f"Erro ao criar e enviar o arquivo zip: {e}") + continue + else: - "Teste de saida" + pass # print(f"Nada encontrado para cidade {territory_info[1]}-{territory_info[2]} no ano {year}") def create_xml_territories(): From 32039d77201d5998ea7220486e4b58c0863ec9c3 Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Mon, 24 Jun 2024 10:54:07 -0300 Subject: [PATCH 09/15] Adiciona gerador de hash para texto do XML Co-authored-by: wildemberg-sales --- contrib/sample.env | 2 ++ storage/digital_ocean_spaces.py | 14 ++++++++++++-- tasks/gazette_txt_to_xml.py | 32 ++++++++++++++++++-------------- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/contrib/sample.env b/contrib/sample.env index 904214d..9ad30a2 100644 --- a/contrib/sample.env +++ b/contrib/sample.env @@ -23,3 +23,5 @@ QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket # Options: ALL, DAILY, UNPROCESSED EXECUTION_MODE=DAILY + +SEED_HASH=querido-diario diff --git a/storage/digital_ocean_spaces.py b/storage/digital_ocean_spaces.py index 768e7e0..d1a1239 100644 --- a/storage/digital_ocean_spaces.py +++ b/storage/digital_ocean_spaces.py @@ -75,15 +75,25 @@ def get_file(self, file_key: str, destination) -> None: def upload_content( self, file_key: str, - content_to_be_uploaded: bytes, + content_to_be_uploaded: str, permission: str = "public-read", ) -> None: logging.debug(f"Uploading {file_key}") - f = BytesIO(content_to_be_uploaded) + f = BytesIO(content_to_be_uploaded.encode()) self._client.upload_fileobj( f, self._bucket, file_key, ExtraArgs={"ACL": permission} ) + def upload_zip( + self, + file_key: str, + content_to_be_uploaded: BytesIO, + permission: str = "public-read", + ) -> None: + logging.debug(f"Uploading {file_key}") + self._client.upload_fileobj( + content_to_be_uploaded, self._bucket, file_key, ExtraArgs={"ACL": permission} + ) def copy_file(self, source_file_key: str, destination_file_key: str) -> None: logging.debug(f"Copying {source_file_key} to {destination_file_key}") diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 0d9eb34..e8651df 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -3,23 +3,26 @@ from database import create_database_interface from storage import create_storage_interface import xml.etree.cElementTree as ET -import hashlib, traceback +import hashlib, traceback, os from datetime import datetime from zipfile import ZipFile, ZIP_DEFLATED -def hash_text(zip): + +def hash_xml(content : str): """ - Receives a zip and returns its SHA-256 hash of a zip content + Receives a text content of a XML file and returns its SHA-256 hash """ - # Cria um objeto sha256 - hasher = hashlib.sha256() + seed_hash = bytes(os.environ['SEED_HASH'].encode('utf-8')) + + # Escolha o algoritmo de hash (no caso, SHA-256) + algorithm = hashlib.sha256 + result_hash = hashlib.pbkdf2_hmac(algorithm().name, content.encode('utf-8'), seed_hash, 100000) - # Atualiza o objeto com o zip codificado em UTF-8 - hasher.update(zip.encode('utf-8')) + # Converta o resultado para uma representação legível (hexadecimal) + hash_hex = result_hash.hex() - # Obtém o hash hexadecimal - return hasher.hexdigest() + return hash_hex def create_xml_for_territory_and_year(territory_info:tuple, database, storage): """ @@ -41,11 +44,8 @@ def create_xml_for_territory_and_year(territory_info:tuple, database, storage): meta_info_tag = ET.SubElement(root, "meta") ET.SubElement(meta_info_tag, "localidade", name="municipio").text = territory_info[1] ET.SubElement(meta_info_tag, "localidade", name="estado").text = territory_info[2] - ET.SubElement(meta_info_tag, "criado_em").text = str(datetime.now()) ET.SubElement(meta_info_tag, "ano").text = str(year) - all_gazettes_tag = ET.SubElement(root, "diarios") - - path_xml = f"aggregates/{territory_info[0]}/{year}/{territory_info[1]}-{territory_info[2]}-{year}.xml" + all_gazettes_tag = ET.SubElement(root, "diarios") for gazette in query_content: try: @@ -77,14 +77,18 @@ def create_xml_for_territory_and_year(territory_info:tuple, database, storage): try: zip_buffer = BytesIO() + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: zip_file.writestr(f"{territory_info[1]}-{territory_info[2]}-{year}.xml", file_xml.getvalue()) + zip_buffer.seek(0) # Volta o cursor de leitura do zip para o começo dele zip_path = f"aggregates/{territory_info[0]}/{year}.zip" - storage.upload_content(zip_path, zip_buffer.getvalue()) + storage.upload_zip(zip_path, zip_buffer) + hx = hash_xml(file_xml.getvalue().decode('utf-8')) + zip_buffer.close() file_xml.close() except Exception as e: From 79a18464910824037f3e4dfc2b19f60ec61d9e44 Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Tue, 25 Jun 2024 16:51:00 -0300 Subject: [PATCH 10/15] =?UTF-8?q?Faz=20verifica=C3=A7=C3=A3o=20de=20regist?= =?UTF-8?q?ros=20existentes=20e=20de=20hash?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: wildemberg-sales --- tasks/gazette_txt_to_xml.py | 59 +++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index e8651df..9e7d589 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -70,44 +70,79 @@ def create_xml_for_territory_and_year(territory_info:tuple, database, storage): tree = ET.ElementTree(root) - file_xml = BytesIO() + xml_file = BytesIO() - tree.write(file_xml, encoding='utf8', xml_declaration=True) - file_xml.seek(0) # Volta o cursor de leitura do arquivo para o começo dele + tree.write(xml_file, encoding='utf8', xml_declaration=True) + xml_file.seek(0) # Volta o cursor de leitura do arquivo para o começo dele + + hx = hash_xml(xml_file.getvalue().decode('utf-8')) + zip_path = f"aggregates/{territory_info[0]}/{year}.zip" + + query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ + WHERE url_zip='{zip_path}';")) + + need_update = False + + if query_existing_aggregate: + need_update = hx != query_existing_aggregate[0][0] + + if not need_update: + xml_file.close() + continue try: zip_buffer = BytesIO() with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: - zip_file.writestr(f"{territory_info[1]}-{territory_info[2]}-{year}.xml", file_xml.getvalue()) + zip_file.writestr(f"{territory_info[1]}-{territory_info[2]}-{year}.xml", xml_file.getvalue()) - zip_buffer.seek(0) # Volta o cursor de leitura do zip para o começo dele + zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) - zip_path = f"aggregates/{territory_info[0]}/{year}.zip" + zip_buffer.seek(0) # Volta o cursor de leitura do zip para o começo dele storage.upload_zip(zip_path, zip_buffer) - - hx = hash_xml(file_xml.getvalue().decode('utf-8')) + + dict_query_info = { + "territory_id" : territory_info[0], + "url_zip": zip_path, + "year": year, + "hash_info": hx, + "file_size": zip_size, + } + + if need_update: + database.insert("UPDATE aggregates SET \ + territory_id=%(territory_id)s, last_updated=NOW(), \ + hash_info=%(hash_info)s, file_size=%(file_size)s \ + WHERE url_zip=%(url_zip)s;", dict_query_info) + else: + database.insert("INSERT INTO aggregates \ + (territory_id, url_zip, year, last_updated, hash_info, file_size)\ + VALUES (%(territory_id)s, %(url_zip)s, %(year)s, NOW(), \ + %(hash_info)s, %(file_size)s);", dict_query_info) zip_buffer.close() - file_xml.close() + xml_file.close() + except Exception as e: print(f"Erro ao criar e enviar o arquivo zip: {e}") continue else: - pass - # print(f"Nada encontrado para cidade {territory_info[1]}-{territory_info[2]} no ano {year}") + print(f"Nada encontrado para cidade {territory_info[1]}-{territory_info[2]} no ano {year}") def create_xml_territories(): + """ + Create xml for all territories available in database + """ database = create_database_interface() storage = create_storage_interface() print("Script que agrega os arquivos .txt para .xml") - # results_query = database.select("SELECT * FROM territories WHERE name='Sampaio' OR name='Xique-Xique';") results_query = database.select("SELECT * FROM territories;") + # results_query = database.select("SELECT * FROM territories WHERE id='1718808';") for t in results_query: try: From a5c49f6c1141cc93e28fa79fd68afeadbf1b62e0 Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Wed, 26 Jun 2024 15:20:43 -0300 Subject: [PATCH 11/15] =?UTF-8?q?Adiciona=20fun=C3=A7=C3=A3o=20para=20cria?= =?UTF-8?q?r=20tabela=20aggregates=20automaticamente?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tasks/gazette_txt_to_xml.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 9e7d589..243ba32 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -7,6 +7,18 @@ from datetime import datetime from zipfile import ZipFile, ZIP_DEFLATED +def create_aggregates_table(database): + database._commit_changes( + """ + CREATE TABLE IF NOT EXISTS aggregates ( + id SERIAL PRIMARY KEY , + territory_id VARCHAR NOT NULL, + url_zip VARCHAR(255), + year INTEGER, + last_updated TIMESTAMP, + hash_info VARCHAR(64), + file_size BIGINT + ); """) def hash_xml(content : str): """ @@ -141,8 +153,10 @@ def create_xml_territories(): print("Script que agrega os arquivos .txt para .xml") - results_query = database.select("SELECT * FROM territories;") - # results_query = database.select("SELECT * FROM territories WHERE id='1718808';") + # results_query = database.select("SELECT * FROM territories;") + results_query = database.select("SELECT * FROM territories WHERE id='1718808';") + + create_aggregates_table(database) for t in results_query: try: From fad61105d8a7b6a1d40c53f1245f355287d0dcb1 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Wed, 17 Jul 2024 10:47:25 -0300 Subject: [PATCH 12/15] =?UTF-8?q?Implementa=20agrega=C3=A7=C3=A3o=20por=20?= =?UTF-8?q?estado?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tasks/gazette_txt_to_xml.py | 289 ++++++++++++++++++++++-------------- tasks/utils/__init__.py | 3 + tasks/utils/hash.py | 32 ++++ tasks/utils/index.py | 2 +- 4 files changed, 216 insertions(+), 110 deletions(-) create mode 100644 tasks/utils/hash.py diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 243ba32..171be3e 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -1,149 +1,218 @@ -# from .interfaces import DatabaseInterface, StorageInterface from io import BytesIO from database import create_database_interface from storage import create_storage_interface import xml.etree.cElementTree as ET -import hashlib, traceback, os +import traceback from datetime import datetime from zipfile import ZipFile, ZIP_DEFLATED +from utils.hash import hash_xml, hash_zip + +need_update_zip_state = False def create_aggregates_table(database): database._commit_changes( """ CREATE TABLE IF NOT EXISTS aggregates ( id SERIAL PRIMARY KEY , - territory_id VARCHAR NOT NULL, + territory_id VARCHAR, + state_code VARCHAR NOT NULL, url_zip VARCHAR(255), year INTEGER, last_updated TIMESTAMP, hash_info VARCHAR(64), - file_size BIGINT + file_size REAL ); """) -def hash_xml(content : str): +def xml_content_generate(gazzetes_query_content:list, root, territory_id, storage): + all_gazettes_tag = ET.SubElement(root, "diarios") + + for gazette in gazzetes_query_content: + try: + file_gazette_txt = BytesIO() + path_arq_bucket = str(gazette[7]) + + if path_arq_bucket.endswith(".pdf"): + path_arq_bucket = path_arq_bucket.replace(".pdf", ".txt") + else: + path_arq_bucket = path_arq_bucket + ".txt" + + storage.get_file(path_arq_bucket, file_gazette_txt) + + gazette_tag = ET.SubElement(all_gazettes_tag, "gazette") + meta_gazette = ET.SubElement(gazette_tag, "meta-gazette") + ET.SubElement(meta_gazette, "url_pdf").text = gazette[8] + ET.SubElement(meta_gazette, "poder").text = gazette[5] + ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette[4] else 'Não' + ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette[3]) if str(gazette[3]) is not None else "Não há" + ET.SubElement(meta_gazette, "data_diario").text = datetime.strftime(gazette[2], "%d/%m") + ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') + + file_gazette_txt.close() + except: + print(f"Erro na obtenção do conteúdo de texto do diário do territorio {territory_id}") + continue + +def create_zip_for_state(xml_arr:list, year, state_code, database, storage): + """ + Creating .zip files for the state with all its territories + """ + + print(f"Gerando ZIP do estado {state_code} no ano {year}") + + zip_path = f"aggregates/{state_code}/{state_code}-{year}.zip" + + zip_buffer = BytesIO() + + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: + for xml_file in xml_arr: + zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml'].getvalue()) + + zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) + zip_buffer.seek(0) + zip_buffer_copy = BytesIO(zip_buffer.getvalue()) + zip_buffer_copy.seek(0) + storage.upload_zip(zip_path, zip_buffer) + + hx = hash_zip(zip_buffer_copy.read()) + + dict_query_info = { + "state_code" : state_code, + "territory_id" : None, + "url_zip": zip_path, + "year": year, + "hash_info": hx, + "file_size": zip_size, + } + + query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ + WHERE url_zip='{zip_path}';")) + + if query_existing_aggregate and hx != query_existing_aggregate[0][0]: + database.insert("UPDATE aggregates SET \ + state_code=%(state_code)s, last_updated=NOW(), \ + hash_info=%(hash_info)s, file_size=%(file_size)s \ + WHERE url_zip=%(url_zip)s;", dict_query_info) + else: + database.insert("INSERT INTO aggregates \ + (territory_id, state_code, url_zip, year, last_updated, hash_info, file_size)\ + VALUES (%(territory_id)s, %(state_code)s, %(url_zip)s, %(year)s, NOW(), \ + %(hash_info)s, %(file_size)s);", dict_query_info) + + zip_buffer.close() + +def create_zip_for_territory(xml_arr:list, database, storage): """ - Receives a text content of a XML file and returns its SHA-256 hash + Creating .zip files for the year's territories """ - seed_hash = bytes(os.environ['SEED_HASH'].encode('utf-8')) + global need_update_zip_state + need_update_zip_state = False - # Escolha o algoritmo de hash (no caso, SHA-256) - algorithm = hashlib.sha256 - result_hash = hashlib.pbkdf2_hmac(algorithm().name, content.encode('utf-8'), seed_hash, 100000) + for xml_file in xml_arr: + try: + hx = hash_xml(xml_file['xml'].getvalue().decode('utf-8')) + zip_path = f"aggregates/{xml_file['state_code']}/{xml_file['territory_id']}-{xml_file['year']}.zip" + + query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ + WHERE url_zip='{zip_path}';")) + + need_update = False - # Converta o resultado para uma representação legível (hexadecimal) - hash_hex = result_hash.hex() + if query_existing_aggregate: + need_update = hx != query_existing_aggregate[0][0] + if not need_update: + continue - return hash_hex + need_update_zip_state = True -def create_xml_for_territory_and_year(territory_info:tuple, database, storage): + zip_buffer = BytesIO() + + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: + zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml'].getvalue()) + + zip_size = round(zip_buffer.tell() / (1024 * 1024), 2) + zip_buffer.seek(0) + + storage.upload_zip(zip_path, zip_buffer) + + dict_query_info = { + "state_code" : xml_file['state_code'], + "territory_id" : xml_file['territory_id'], + "url_zip": zip_path, + "year": xml_file['year'], + "hash_info": hx, + "file_size": zip_size, + } + + if need_update: + database.insert("UPDATE aggregates SET \ + territory_id=%(territory_id)s, state_code=%(state_code)s, last_updated=NOW(), \ + hash_info=%(hash_info)s, file_size=%(file_size)s \ + WHERE url_zip=%(url_zip)s;", dict_query_info) + else: + database.insert("INSERT INTO aggregates \ + (territory_id, state_code, url_zip, year, last_updated, hash_info, file_size)\ + VALUES (%(territory_id)s, %(state_code)s, %(url_zip)s, %(year)s, NOW(), \ + %(hash_info)s, %(file_size)s);", dict_query_info) + + zip_buffer.close() + except Exception as e: + print(traceback.format_exc()) + +def create_aggregates_for_territories_and_states(territories_list:list, state, database, storage): """ Create a .xml files for each year of gazettes for a territory """ - actual_year = datetime.now().year base_year = 1960 for year in range(base_year, actual_year+1): - query_content = list(database.select(f"SELECT * FROM gazettes\ - WHERE territory_id='{territory_info[0]}' AND\ - date BETWEEN '{year}-01-01' AND '{year}-12-31'\ - ORDER BY date ASC;")) + xml_files_arr = [] - if len(query_content) > 0: - print(f"Gerando XML para cidade {territory_info[1]}-{territory_info[2]} no ano {year}") + for territories in territories_list: root = ET.Element("root") - meta_info_tag = ET.SubElement(root, "meta") - ET.SubElement(meta_info_tag, "localidade", name="municipio").text = territory_info[1] - ET.SubElement(meta_info_tag, "localidade", name="estado").text = territory_info[2] - ET.SubElement(meta_info_tag, "ano").text = str(year) - all_gazettes_tag = ET.SubElement(root, "diarios") - - for gazette in query_content: - try: - file_gazette_txt = BytesIO() - path_arq_bucket = str(gazette[7]).replace(".pdf",".txt") # É a posição 7 que contem o caminho do arquivo dentro do S3 - - storage.get_file(path_arq_bucket, file_gazette_txt) - - gazette_tag = ET.SubElement(all_gazettes_tag, "gazette") - meta_gazette = ET.SubElement(gazette_tag, "meta-gazette") - ET.SubElement(meta_gazette, "url_pdf").text = gazette[8] - ET.SubElement(meta_gazette, "poder").text = gazette[5] - ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette[4] else 'Não' - ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette[3]) if str(gazette[3]) is not None else "Não há" - ET.SubElement(meta_gazette, "data_diario").text = datetime.strftime(gazette[2], "%d/%m") - ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') - - file_gazette_txt.close() - except: - print(f"Erro na obtenção do conteúdo de texto do diário de {territory_info[1]}-{territory_info[2]}-{gazette[2]}") - continue - - tree = ET.ElementTree(root) - xml_file = BytesIO() - tree.write(xml_file, encoding='utf8', xml_declaration=True) - xml_file.seek(0) # Volta o cursor de leitura do arquivo para o começo dele + gazzetes_query_content = list(database.select(f"SELECT * FROM gazettes\ + WHERE territory_id='{territories[0]}' AND\ + date BETWEEN '{year}-01-01' AND '{year}-12-31'\ + ORDER BY date ASC;")) - hx = hash_xml(xml_file.getvalue().decode('utf-8')) - zip_path = f"aggregates/{territory_info[0]}/{year}.zip" + if gazzetes_query_content: + print(f"Gerando XML para cidade {territories[1]}-{territories[2]} no ano {year}") - query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ - WHERE url_zip='{zip_path}';")) + meta_info_tag = ET.SubElement(root, "meta") + ET.SubElement(meta_info_tag, "localidade", name="estado").text = territories[2] + ET.SubElement(meta_info_tag, "ano").text = str(year) + ET.SubElement(meta_info_tag, "localidade", name="municipio").text = territories[1] + ET.SubElement(meta_info_tag, "codigo_ibge", name="codigo_ibge").text = territories[0] - need_update = False + xml_content_generate(gazzetes_query_content, root, territories[0], storage) - if query_existing_aggregate: - need_update = hx != query_existing_aggregate[0][0] + tree = ET.ElementTree(root) + tree.write(xml_file, encoding='utf8', xml_declaration=True) + xml_file.seek(0) - if not need_update: - xml_file.close() - continue - - try: - zip_buffer = BytesIO() - - with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: - zip_file.writestr(f"{territory_info[1]}-{territory_info[2]}-{year}.xml", xml_file.getvalue()) - - zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) - - zip_buffer.seek(0) # Volta o cursor de leitura do zip para o começo dele - - storage.upload_zip(zip_path, zip_buffer) - - dict_query_info = { - "territory_id" : territory_info[0], - "url_zip": zip_path, - "year": year, - "hash_info": hx, - "file_size": zip_size, + meta_xml={ + "xml":xml_file, + "territory_id":territories[0], + "state_code":territories[2], + "year":year } - if need_update: - database.insert("UPDATE aggregates SET \ - territory_id=%(territory_id)s, last_updated=NOW(), \ - hash_info=%(hash_info)s, file_size=%(file_size)s \ - WHERE url_zip=%(url_zip)s;", dict_query_info) - else: - database.insert("INSERT INTO aggregates \ - (territory_id, url_zip, year, last_updated, hash_info, file_size)\ - VALUES (%(territory_id)s, %(url_zip)s, %(year)s, NOW(), \ - %(hash_info)s, %(file_size)s);", dict_query_info) - - zip_buffer.close() - xml_file.close() - - except Exception as e: - print(f"Erro ao criar e enviar o arquivo zip: {e}") - continue - - else: - print(f"Nada encontrado para cidade {territory_info[1]}-{territory_info[2]} no ano {year}") + xml_files_arr.append(meta_xml) + else: + # print(f"Nada encontrado para cidade {territories[1]}-{territories[2]} no ano {year}") + pass + + create_zip_for_territory(xml_files_arr, database, storage) -def create_xml_territories(): + if need_update_zip_state: + create_zip_for_state(xml_files_arr, year, state, database, storage) + + xml_file.close() + +def create_aggregates(): """ Create xml for all territories available in database """ @@ -151,20 +220,22 @@ def create_xml_territories(): database = create_database_interface() storage = create_storage_interface() - print("Script que agrega os arquivos .txt para .xml") + print("========== Script que agrega os arquivos .txt para .xml de territórios e estados ==========") + + results_query_states = database.select("SELECT DISTINCT state_code FROM territories ORDER BY state_code ASC;") - # results_query = database.select("SELECT * FROM territories;") - results_query = database.select("SELECT * FROM territories WHERE id='1718808';") + for res in results_query_states: + + results_query_territories = database.select(f"SELECT * FROM territories WHERE state_code='{res[0]}';") - create_aggregates_table(database) + create_aggregates_table(database) - for t in results_query: try: - create_xml_for_territory_and_year(t, database, storage) + create_aggregates_for_territories_and_states(list(results_query_territories), res[0], database, storage) except: print(traceback.format_exc()) continue if __name__ == "__main__": - create_xml_territories() \ No newline at end of file + create_aggregates() \ No newline at end of file diff --git a/tasks/utils/__init__.py b/tasks/utils/__init__.py index 129d85e..bbeee0d 100644 --- a/tasks/utils/__init__.py +++ b/tasks/utils/__init__.py @@ -13,3 +13,6 @@ get_territory_slug, get_territory_data, ) +from .hash import ( + hash_xml, +) \ No newline at end of file diff --git a/tasks/utils/hash.py b/tasks/utils/hash.py new file mode 100644 index 0000000..8713743 --- /dev/null +++ b/tasks/utils/hash.py @@ -0,0 +1,32 @@ +import hashlib, os + +def hash_xml(content : str): + """ + Receives a text content of a XML file and returns its SHA-256 hash + """ + + seed_hash = bytes(os.environ['SEED_HASH'].encode('utf-8')) + + # Escolha o algoritmo de hash (no caso, SHA-256) + algorithm = hashlib.sha256 + result_hash = hashlib.pbkdf2_hmac(algorithm().name, content.encode('utf-8'), seed_hash, 100000) + + # Converta o resultado para uma representação legível (hexadecimal) + hash_hex = result_hash.hex() + + return hash_hex + +def hash_zip(zip_content: bytes) -> str: + """ + Receives the content of a zip file and returns its SHA-256 hash. + """ + seed_hash = bytes(os.environ.get('SEED_HASH', 'default_seed').encode('utf-8')) + + # Escolha o algoritmo de hash (no caso, SHA-256) + algorithm = hashlib.sha256 + result_hash = hashlib.pbkdf2_hmac(algorithm().name, zip_content, seed_hash, 100000) + + # Converta o resultado para uma representação legível (hexadecimal) + hash_hex = result_hash.hex() + + return hash_hex \ No newline at end of file diff --git a/tasks/utils/index.py b/tasks/utils/index.py index 83d769c..683ec29 100644 --- a/tasks/utils/index.py +++ b/tasks/utils/index.py @@ -1,6 +1,6 @@ from typing import Dict, Iterable, List -from ..interfaces import IndexInterface +from tasks.interfaces import IndexInterface def get_documents_with_ids( From c24433e2bf39fe43d5961f34e0a400808aa2484d Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Wed, 17 Jul 2024 13:52:13 -0300 Subject: [PATCH 13/15] =?UTF-8?q?Adiciona=20formata=C3=A7=C3=A3o=20no=20ar?= =?UTF-8?q?quivo=20XML=20gerado?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tasks/gazette_txt_to_xml.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 171be3e..66830bd 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -6,6 +6,8 @@ from datetime import datetime from zipfile import ZipFile, ZIP_DEFLATED from utils.hash import hash_xml, hash_zip +from xml.dom import minidom + need_update_zip_state = False @@ -189,8 +191,12 @@ def create_aggregates_for_territories_and_states(territories_list:list, state, d xml_content_generate(gazzetes_query_content, root, territories[0], storage) - tree = ET.ElementTree(root) - tree.write(xml_file, encoding='utf8', xml_declaration=True) + # Format XML file + xml_str = ET.tostring(root, encoding='unicode') + format_xml = minidom.parseString(xml_str).toprettyxml(indent=" ") + xml_bytes = format_xml.encode('utf-8') + + xml_file.write(xml_bytes) xml_file.seek(0) meta_xml={ From eede142d385fa4b5bd93bc61b23a847dd7487b13 Mon Sep 17 00:00:00 2001 From: Giulio Date: Mon, 22 Jul 2024 17:35:54 -0300 Subject: [PATCH 14/15] Adiciona CLI para escolher pipeline a executar MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CLI permite que o pipeline principal seja executado sem argumento e também que a task de `create_aggregates` seja executada por meio do pipeline `aggregates`. Modifica funções de hash e upload para receberem str e bytes Modifica tabela aggregates e sua inicialização Refatoração da lógica de busca de territorios/ano e geração de zips Co-authored-by: csafurtado Refatoração das funções de agregação através de nova query com inserção de upsert. Co-authored-by: wildemberg-sales --- Makefile | 2 +- main/__main__.py | 31 ++- storage/digital_ocean_spaces.py | 34 ++- tasks/__init__.py | 2 + tasks/create_aggregates_table.py | 18 ++ tasks/gazette_txt_to_xml.py | 389 +++++++++++++++---------------- tasks/interfaces.py | 10 +- tasks/utils/__init__.py | 7 +- tasks/utils/hash.py | 31 +-- tasks/utils/index.py | 2 +- tasks/utils/need_upsert.py | 18 ++ 11 files changed, 289 insertions(+), 255 deletions(-) create mode 100644 tasks/create_aggregates_table.py create mode 100644 tasks/utils/need_upsert.py diff --git a/Makefile b/Makefile index 95bd25e..297651c 100644 --- a/Makefile +++ b/Makefile @@ -278,4 +278,4 @@ aggregate-gazettes: stop-aggregate-gazettes set-run-variable-values --env PYTHONPATH=/mnt/code \ --env-file envvars \ --name agg-gazettes \ - $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(IMAGE_TAG) python tasks/gazette_txt_to_xml.py \ No newline at end of file + $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(IMAGE_TAG) python main -p aggregates diff --git a/main/__main__.py b/main/__main__.py index e8dd5ec..5f9951c 100644 --- a/main/__main__.py +++ b/main/__main__.py @@ -1,4 +1,5 @@ from os import environ +import argparse import logging from data_extraction import create_apache_tika_text_extraction @@ -6,7 +7,9 @@ from storage import create_storage_interface from index import create_index_interface from tasks import ( + create_aggregates, create_gazettes_index, + create_aggregates_table, create_themed_excerpts_index, embedding_rerank_excerpts, extract_text_from_gazettes, @@ -35,9 +38,7 @@ def get_execution_mode(): return environ.get("EXECUTION_MODE", "DAILY") -def execute_pipeline(): - enable_debug_if_necessary() - +def gazette_texts_pipeline(): execution_mode = get_execution_mode() database = create_database_interface() storage = create_storage_interface() @@ -61,5 +62,27 @@ def execute_pipeline(): tag_entities_in_excerpts(theme, themed_excerpt_ids, index) +def aggregates_pipeline(): + database = create_database_interface() + storage = create_storage_interface() + + create_aggregates_table(database) + create_aggregates(database, storage) + + +def execute_pipeline(pipeline): + enable_debug_if_necessary() + + if not pipeline or pipeline == "gazette_texts": + gazette_texts_pipeline() + elif pipeline == "aggregates": + aggregates_pipeline() + else: + raise ValueError("Pipeline inválido.") + + if __name__ == "__main__": - execute_pipeline() + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--pipeline", help="Qual pipeline deve ser executado.") + args = parser.parse_args() + execute_pipeline(args.pipeline) diff --git a/storage/digital_ocean_spaces.py b/storage/digital_ocean_spaces.py index d1a1239..792abd5 100644 --- a/storage/digital_ocean_spaces.py +++ b/storage/digital_ocean_spaces.py @@ -1,7 +1,8 @@ import logging import os -from typing import Generator +from typing import Generator, Union from io import BytesIO +from pathlib import Path import boto3 @@ -68,32 +69,27 @@ def __init__( aws_secret_access_key=self._access_secret, ) - def get_file(self, file_key: str, destination) -> None: - logging.debug(f"Getting {file_key}") - self._client.download_fileobj(self._bucket, file_key, destination) + def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None: + logging.debug(f"Getting {file_to_be_downloaded}") + self._client.download_fileobj(self._bucket, str(file_to_be_downloaded), destination) def upload_content( self, file_key: str, - content_to_be_uploaded: str, + content_to_be_uploaded: Union[str, BytesIO], permission: str = "public-read", ) -> None: logging.debug(f"Uploading {file_key}") - f = BytesIO(content_to_be_uploaded.encode()) - self._client.upload_fileobj( - f, self._bucket, file_key, ExtraArgs={"ACL": permission} - ) - def upload_zip( - self, - file_key: str, - content_to_be_uploaded: BytesIO, - permission: str = "public-read", - ) -> None: - logging.debug(f"Uploading {file_key}") - self._client.upload_fileobj( + if isinstance(content_to_be_uploaded, str): + f = BytesIO(content_to_be_uploaded.encode()) + self._client.upload_fileobj( + f, self._bucket, file_key, ExtraArgs={"ACL": permission} + ) + else: + self._client.upload_fileobj( content_to_be_uploaded, self._bucket, file_key, ExtraArgs={"ACL": permission} - ) + ) def copy_file(self, source_file_key: str, destination_file_key: str) -> None: logging.debug(f"Copying {source_file_key} to {destination_file_key}") @@ -104,5 +100,5 @@ def copy_file(self, source_file_key: str, destination_file_key: str) -> None: ) def delete_file(self, file_key: str) -> None: - logging.debug(f"Deletando {file_key}") + logging.debug(f"Deleting {file_key}") self._client.delete_object(Bucket=self._bucket, Key=file_key) diff --git a/tasks/__init__.py b/tasks/__init__.py index 63fd625..643a257 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,9 +1,11 @@ from .create_index import create_gazettes_index, create_themed_excerpts_index +from .create_aggregates_table import create_aggregates_table from .gazette_excerpts_embedding_reranking import embedding_rerank_excerpts from .gazette_excerpts_entities_tagging import tag_entities_in_excerpts from .gazette_text_extraction import extract_text_from_gazettes from .gazette_themed_excerpts_extraction import extract_themed_excerpts_from_gazettes from .gazette_themes_listing import get_themes +from .gazette_txt_to_xml import create_aggregates from .interfaces import ( DatabaseInterface, StorageInterface, diff --git a/tasks/create_aggregates_table.py b/tasks/create_aggregates_table.py new file mode 100644 index 0000000..b28dc0e --- /dev/null +++ b/tasks/create_aggregates_table.py @@ -0,0 +1,18 @@ +from .interfaces import DatabaseInterface + + +def create_aggregates_table(database: DatabaseInterface): + database._commit_changes( + """ + CREATE TABLE IF NOT EXISTS aggregates ( + id SERIAL PRIMARY KEY , + territory_id VARCHAR, + state_code VARCHAR NOT NULL, + year INTEGER, + file_path VARCHAR(255) UNIQUE, + file_size_mb REAL, + hash_info VARCHAR(64), + last_updated TIMESTAMP + ); """) + + \ No newline at end of file diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 66830bd..33665f1 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -1,247 +1,236 @@ -from io import BytesIO -from database import create_database_interface -from storage import create_storage_interface -import xml.etree.cElementTree as ET import traceback +import xml.etree.cElementTree as ET +import logging from datetime import datetime -from zipfile import ZipFile, ZIP_DEFLATED -from utils.hash import hash_xml, hash_zip +from io import BytesIO from xml.dom import minidom +from zipfile import ZipFile, ZIP_DEFLATED +from pathlib import Path +from .utils import hash_content, zip_needs_upsert, get_territory_slug +from .interfaces import StorageInterface,DatabaseInterface -need_update_zip_state = False +from botocore.exceptions import ClientError -def create_aggregates_table(database): - database._commit_changes( - """ - CREATE TABLE IF NOT EXISTS aggregates ( - id SERIAL PRIMARY KEY , - territory_id VARCHAR, - state_code VARCHAR NOT NULL, - url_zip VARCHAR(255), - year INTEGER, - last_updated TIMESTAMP, - hash_info VARCHAR(64), - file_size REAL - ); """) - -def xml_content_generate(gazzetes_query_content:list, root, territory_id, storage): - all_gazettes_tag = ET.SubElement(root, "diarios") +need_update_zip_state = False +logger = logging.getLogger(__name__) - for gazette in gazzetes_query_content: - try: - file_gazette_txt = BytesIO() - path_arq_bucket = str(gazette[7]) - if path_arq_bucket.endswith(".pdf"): - path_arq_bucket = path_arq_bucket.replace(".pdf", ".txt") - else: - path_arq_bucket = path_arq_bucket + ".txt" - - storage.get_file(path_arq_bucket, file_gazette_txt) +def create_aggregates(database:DatabaseInterface, storage:StorageInterface): + """ + Create xml for all territories available in database + """ + logger.info("Agregando os arquivos TXT para XML de territórios e estados...") + - gazette_tag = ET.SubElement(all_gazettes_tag, "gazette") - meta_gazette = ET.SubElement(gazette_tag, "meta-gazette") - ET.SubElement(meta_gazette, "url_pdf").text = gazette[8] - ET.SubElement(meta_gazette, "poder").text = gazette[5] - ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette[4] else 'Não' - ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette[3]) if str(gazette[3]) is not None else "Não há" - ET.SubElement(meta_gazette, "data_diario").text = datetime.strftime(gazette[2], "%d/%m") - ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') + results_query_states = list(database.select(f"SELECT t.state_code AS code, json_agg(json_build_object('id',t.id, 'name',t.name)) FROM territories t GROUP BY code;")) - file_gazette_txt.close() - except: - print(f"Erro na obtenção do conteúdo de texto do diário do territorio {territory_id}") + for state, territories_list in results_query_states: + try: + create_aggregates_for_territories_and_states(territories_list, state, database, storage) + except Exception as e: + logger.error(f"Erro ao tentar processar municípios de {state}: {e}\n{traceback.format_exc()}") + continue + -def create_zip_for_state(xml_arr:list, year, state_code, database, storage): +def create_aggregates_for_territories_and_states(territories_list:list, state:str, database:DatabaseInterface, storage:StorageInterface): """ - Creating .zip files for the state with all its territories + Create a .xml files for each year of gazettes for a territory """ - - print(f"Gerando ZIP do estado {state_code} no ano {year}") - zip_path = f"aggregates/{state_code}/{state_code}-{year}.zip" + xml_files_dict = {} + arr_years_update = [] + + for territory in territories_list: + query_content_gazzetes_for_territory = list(database.select(f"""SELECT + date_part('Year', g.date) AS year, + json_agg(g.*) + FROM + gazettes g + WHERE + g.territory_id='{territory['id']}' + GROUP BY + year + """ + )) + - zip_buffer = BytesIO() + for year, list_gazzetes_content in query_content_gazzetes_for_territory: + year = str(int(year)) - with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: - for xml_file in xml_arr: - zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml'].getvalue()) - - zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) - zip_buffer.seek(0) - zip_buffer_copy = BytesIO(zip_buffer.getvalue()) - zip_buffer_copy.seek(0) - storage.upload_zip(zip_path, zip_buffer) + meta_xml = xml_content_generate(state, year, territory, list_gazzetes_content, storage) - hx = hash_zip(zip_buffer_copy.read()) + if year not in xml_files_dict: + xml_files_dict[year] = [] - dict_query_info = { - "state_code" : state_code, - "territory_id" : None, - "url_zip": zip_path, - "year": year, - "hash_info": hx, - "file_size": zip_size, - } + xml_files_dict[year].append(meta_xml) - query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ - WHERE url_zip='{zip_path}';")) + territory_slug = get_territory_slug(territory['name'], state) + zip_path = f"aggregates/{meta_xml['state_code']}/{territory_slug}_{meta_xml['territory_id']}_{meta_xml['year']}.zip" + hx = hash_content(meta_xml['xml']) - if query_existing_aggregate and hx != query_existing_aggregate[0][0]: - database.insert("UPDATE aggregates SET \ - state_code=%(state_code)s, last_updated=NOW(), \ - hash_info=%(hash_info)s, file_size=%(file_size)s \ - WHERE url_zip=%(url_zip)s;", dict_query_info) - else: - database.insert("INSERT INTO aggregates \ - (territory_id, state_code, url_zip, year, last_updated, hash_info, file_size)\ - VALUES (%(territory_id)s, %(state_code)s, %(url_zip)s, %(year)s, NOW(), \ - %(hash_info)s, %(file_size)s);", dict_query_info) + need_update_territory_zip = zip_needs_upsert(hx, zip_path, database) - zip_buffer.close() + if need_update_territory_zip: + if year not in arr_years_update: + arr_years_update.append(year) + create_zip_for_territory(hx, zip_path, meta_xml, database, storage) -def create_zip_for_territory(xml_arr:list, database, storage): + if arr_years_update: + create_zip_for_state(xml_files_dict, arr_years_update, state, database, storage) + + +def create_zip_for_state(xmls_years_dict:dict, arr_years_update:list, state_code, database:DatabaseInterface, storage:StorageInterface): """ - Creating .zip files for the year's territories + Creating .zip files for the state with all its territories """ - global need_update_zip_state - need_update_zip_state = False + for year in arr_years_update: + logger.info(f"Gerando ZIP do estado {state_code} no ano {year}") + + xmls = xmls_years_dict[year] + + zip_path = f"aggregates/{state_code}/{state_code}_{year}.zip" - for xml_file in xml_arr: - try: - hx = hash_xml(xml_file['xml'].getvalue().decode('utf-8')) - zip_path = f"aggregates/{xml_file['state_code']}/{xml_file['territory_id']}-{xml_file['year']}.zip" - - query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ - WHERE url_zip='{zip_path}';")) - - need_update = False + zip_buffer = BytesIO() - if query_existing_aggregate: - need_update = hx != query_existing_aggregate[0][0] - if not need_update: - continue + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: + for xml_file in xmls: + zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml']) + + zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) + zip_buffer.seek(0) + zip_buffer_copy = BytesIO(zip_buffer.getvalue()) + zip_buffer_copy.seek(0) + storage.upload_content(zip_path, zip_buffer) + + hx = hash_content(zip_buffer_copy.read()) + + dict_query_info = { + "state_code" : state_code, + "territory_id" : None, + "file_path": zip_path, + "year": year, + "hash_info": hx, + "file_size_mb": zip_size, + } - need_update_zip_state = True + database.insert("INSERT INTO aggregates \ + (territory_id, state_code, year, file_path, \ + file_size_mb, hash_info, last_updated) \ + VALUES (%(territory_id)s, %(state_code)s, \ + %(year)s, %(file_path)s, %(file_size_mb)s, \ + %(hash_info)s, NOW()) \ + ON CONFLICT(file_path) \ + DO UPDATE \ + SET state_code = EXCLUDED.state_code, last_updated=NOW(), \ + hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) - zip_buffer = BytesIO() + zip_buffer.close() - with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: - zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml'].getvalue()) - - zip_size = round(zip_buffer.tell() / (1024 * 1024), 2) - zip_buffer.seek(0) - - storage.upload_zip(zip_path, zip_buffer) - - dict_query_info = { - "state_code" : xml_file['state_code'], - "territory_id" : xml_file['territory_id'], - "url_zip": zip_path, - "year": xml_file['year'], - "hash_info": hx, - "file_size": zip_size, - } - - if need_update: - database.insert("UPDATE aggregates SET \ - territory_id=%(territory_id)s, state_code=%(state_code)s, last_updated=NOW(), \ - hash_info=%(hash_info)s, file_size=%(file_size)s \ - WHERE url_zip=%(url_zip)s;", dict_query_info) - else: - database.insert("INSERT INTO aggregates \ - (territory_id, state_code, url_zip, year, last_updated, hash_info, file_size)\ - VALUES (%(territory_id)s, %(state_code)s, %(url_zip)s, %(year)s, NOW(), \ - %(hash_info)s, %(file_size)s);", dict_query_info) - - zip_buffer.close() - except Exception as e: - print(traceback.format_exc()) -def create_aggregates_for_territories_and_states(territories_list:list, state, database, storage): +def create_zip_for_territory(hx:str, zip_path:str, xml_file:dict, database:DatabaseInterface, storage:StorageInterface): """ - Create a .xml files for each year of gazettes for a territory + Creating .zip files for the year's territories """ - actual_year = datetime.now().year - base_year = 1960 - - for year in range(base_year, actual_year+1): - xml_files_arr = [] - - for territories in territories_list: - root = ET.Element("root") - xml_file = BytesIO() - - gazzetes_query_content = list(database.select(f"SELECT * FROM gazettes\ - WHERE territory_id='{territories[0]}' AND\ - date BETWEEN '{year}-01-01' AND '{year}-12-31'\ - ORDER BY date ASC;")) - - if gazzetes_query_content: - print(f"Gerando XML para cidade {territories[1]}-{territories[2]} no ano {year}") - - meta_info_tag = ET.SubElement(root, "meta") - ET.SubElement(meta_info_tag, "localidade", name="estado").text = territories[2] - ET.SubElement(meta_info_tag, "ano").text = str(year) - ET.SubElement(meta_info_tag, "localidade", name="municipio").text = territories[1] - ET.SubElement(meta_info_tag, "codigo_ibge", name="codigo_ibge").text = territories[0] - - xml_content_generate(gazzetes_query_content, root, territories[0], storage) - - # Format XML file - xml_str = ET.tostring(root, encoding='unicode') - format_xml = minidom.parseString(xml_str).toprettyxml(indent=" ") - xml_bytes = format_xml.encode('utf-8') - - xml_file.write(xml_bytes) - xml_file.seek(0) - - meta_xml={ - "xml":xml_file, - "territory_id":territories[0], - "state_code":territories[2], - "year":year - } - - xml_files_arr.append(meta_xml) - else: - # print(f"Nada encontrado para cidade {territories[1]}-{territories[2]} no ano {year}") - pass - - create_zip_for_territory(xml_files_arr, database, storage) - if need_update_zip_state: - create_zip_for_state(xml_files_arr, year, state, database, storage) + logger.info(f"Gerando ZIP do municipio {xml_file['territory_id']} no ano {xml_file['year']}") + + zip_buffer = BytesIO() + + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: + zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml']) + + zip_size = round(zip_buffer.tell() / (1024 * 1024), 2) + zip_buffer.seek(0) + + try: + storage.upload_content(zip_path, zip_buffer) + except ClientError as e: + logger.error(f"Não foi possível fazer o upload do zip do município {xml_file['territory_id']}:\n{traceback.format_exc()}") - xml_file.close() + zip_buffer.close() -def create_aggregates(): - """ - Create xml for all territories available in database - """ + dict_query_info = { + "state_code" : xml_file['state_code'], + "territory_id" : xml_file['territory_id'], + "file_path": zip_path, + "year": xml_file['year'], + "hash_info": hx, + "file_size_mb": zip_size, + } - database = create_database_interface() - storage = create_storage_interface() + database.insert("INSERT INTO aggregates \ + (territory_id, state_code, year, file_path, \ + file_size_mb, hash_info, last_updated) \ + VALUES (%(territory_id)s, %(state_code)s, \ + %(year)s, %(file_path)s, %(file_size_mb)s, \ + %(hash_info)s, NOW()) \ + ON CONFLICT(file_path) \ + DO UPDATE \ + SET state_code=EXCLUDED.state_code, last_updated=NOW(), \ + hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) - print("========== Script que agrega os arquivos .txt para .xml de territórios e estados ==========") + zip_buffer.close() - results_query_states = database.select("SELECT DISTINCT state_code FROM territories ORDER BY state_code ASC;") - for res in results_query_states: - - results_query_territories = database.select(f"SELECT * FROM territories WHERE state_code='{res[0]}';") +def xml_content_generate(state:str, year:str, territory:dict, list_gazzetes_content:list, storage:StorageInterface): + """ + Generates xml file with gazzetes content + """ + + root = ET.Element("root") + xml_file = BytesIO() - create_aggregates_table(database) + logger.info(f"Gerando XML para cidade {territory['name']}-{state} no ano {year}") + + meta_info_tag = ET.SubElement(root, "meta") + ET.SubElement(meta_info_tag, "uf").text = state + ET.SubElement(meta_info_tag, "ano_publicacao").text = str(year) + ET.SubElement(meta_info_tag, "municipio").text = territory['name'] + ET.SubElement(meta_info_tag, "municipio_codigo_ibge").text = territory['id'] + all_gazettes_tag = ET.SubElement(root, "diarios") + + for gazette in list_gazzetes_content: + file_gazette_txt = BytesIO() + path_arq_bucket = Path(gazette['file_path']).with_suffix(".txt") + try: - create_aggregates_for_territories_and_states(list(results_query_territories), res[0], database, storage) - except: - print(traceback.format_exc()) + storage.get_file(path_arq_bucket, file_gazette_txt) + except ClientError as e: + logger.error(f"Erro na obtenção do conteúdo de texto do diário do territorio {gazette['territory_id']}: {e}") + file_gazette_txt.close() + continue + gazette_tag = ET.SubElement(all_gazettes_tag, "diario") + meta_gazette = ET.SubElement(gazette_tag, "meta-diario") + ET.SubElement(meta_gazette, "url_arquivo_original").text = gazette['file_url'] + ET.SubElement(meta_gazette, "poder").text = gazette['power'] + ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette['is_extra_edition'] else 'Não' + ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette['edition_number']) if str(gazette['edition_number']) is not None else "Não há" + ET.SubElement(meta_gazette, "data_publicacao").text = datetime.strftime((datetime.strptime(gazette['date'], "%Y-%m-%d").date()), "%d/%m") + ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') + + file_gazette_txt.close() + + # Format XML file + xml_str = ET.tostring(root, encoding='unicode') + format_xml = minidom.parseString(xml_str).toprettyxml(indent=" ") + xml_bytes = format_xml.encode('utf-8') + + xml_file.write(xml_bytes) + xml_file.seek(0) + + data = { + "xml":xml_file.getvalue(), + "territory_id":territory['id'], + "state_code":state, + "year":year + } + + xml_file.close() -if __name__ == "__main__": - create_aggregates() \ No newline at end of file + return data \ No newline at end of file diff --git a/tasks/interfaces.py b/tasks/interfaces.py index e60327b..444e59b 100644 --- a/tasks/interfaces.py +++ b/tasks/interfaces.py @@ -1,5 +1,7 @@ -from typing import Dict, Iterable, Tuple +from typing import Dict, Iterable, Tuple, Union +from pathlib import Path import abc +from io import BytesIO class DatabaseInterface(abc.ABC): @@ -45,13 +47,13 @@ class StorageInterface(abc.ABC): """ @abc.abstractmethod - def get_file(self, file_to_be_downloaded: str, destination) -> None: + def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None: """ Download the given file key in the destination on the host """ @abc.abstractmethod - def upload_content(self, file_key: str, content_to_be_uploaded: str) -> None: + def upload_content(self, file_key: str, content_to_be_uploaded: Union[str, BytesIO]) -> None: """ Upload the given content to the destination on the host """ @@ -65,7 +67,7 @@ def copy_file(self, source_file_key: str, destination_file_key: str) -> None: @abc.abstractmethod def delete_file(self, file_key: str) -> None: """ - Delete a file in the bucket S3. + Delete a file on the host. """ diff --git a/tasks/utils/__init__.py b/tasks/utils/__init__.py index bbeee0d..aa40fac 100644 --- a/tasks/utils/__init__.py +++ b/tasks/utils/__init__.py @@ -14,5 +14,8 @@ get_territory_data, ) from .hash import ( - hash_xml, -) \ No newline at end of file + hash_content, +) +from .need_upsert import ( + zip_needs_upsert, +) diff --git a/tasks/utils/hash.py b/tasks/utils/hash.py index 8713743..e4ef927 100644 --- a/tasks/utils/hash.py +++ b/tasks/utils/hash.py @@ -1,32 +1,15 @@ import hashlib, os +import logging -def hash_xml(content : str): - """ - Receives a text content of a XML file and returns its SHA-256 hash - """ - - seed_hash = bytes(os.environ['SEED_HASH'].encode('utf-8')) - - # Escolha o algoritmo de hash (no caso, SHA-256) - algorithm = hashlib.sha256 - result_hash = hashlib.pbkdf2_hmac(algorithm().name, content.encode('utf-8'), seed_hash, 100000) - - # Converta o resultado para uma representação legível (hexadecimal) - hash_hex = result_hash.hex() - - return hash_hex +logger = logging.getLogger(__name__) -def hash_zip(zip_content: bytes) -> str: +def hash_content(content: bytes) -> str: """ - Receives the content of a zip file and returns its SHA-256 hash. + Receives a content of byte format and returns its SHA-256 hash """ - seed_hash = bytes(os.environ.get('SEED_HASH', 'default_seed').encode('utf-8')) - # Escolha o algoritmo de hash (no caso, SHA-256) - algorithm = hashlib.sha256 - result_hash = hashlib.pbkdf2_hmac(algorithm().name, zip_content, seed_hash, 100000) + result_hash = hashlib.sha256(content).hexdigest() - # Converta o resultado para uma representação legível (hexadecimal) - hash_hex = result_hash.hex() + logger.info(f"Hash: {result_hash}") - return hash_hex \ No newline at end of file + return result_hash \ No newline at end of file diff --git a/tasks/utils/index.py b/tasks/utils/index.py index 683ec29..83d769c 100644 --- a/tasks/utils/index.py +++ b/tasks/utils/index.py @@ -1,6 +1,6 @@ from typing import Dict, Iterable, List -from tasks.interfaces import IndexInterface +from ..interfaces import IndexInterface def get_documents_with_ids( diff --git a/tasks/utils/need_upsert.py b/tasks/utils/need_upsert.py new file mode 100644 index 0000000..6f40787 --- /dev/null +++ b/tasks/utils/need_upsert.py @@ -0,0 +1,18 @@ +from typing import Union +from ..interfaces import DatabaseInterface + + +def zip_needs_upsert(hx: Union[str, bytes], zip_path:str, database:DatabaseInterface): + """ + Verifies if zip need an upsert to the database (update or insert) + """ + + need_update = True + + query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ + WHERE file_path='{zip_path}';")) + + if query_existing_aggregate: + need_update = hx != query_existing_aggregate[0][0] + + return need_update \ No newline at end of file From 5593ac8e50f877cacca1c15e6d9bf02e1760b46a Mon Sep 17 00:00:00 2001 From: Cristian Furtado Date: Mon, 12 Aug 2024 15:31:59 -0300 Subject: [PATCH 15/15] Adiciona timezone BR para agregados --- tasks/gazette_txt_to_xml.py | 62 ++++++++++++++++++++++++------------- tasks/utils/hash.py | 4 --- tasks/utils/need_upsert.py | 6 ++-- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py index 33665f1..80524ee 100644 --- a/tasks/gazette_txt_to_xml.py +++ b/tasks/gazette_txt_to_xml.py @@ -1,7 +1,7 @@ import traceback import xml.etree.cElementTree as ET import logging -from datetime import datetime +from datetime import datetime, timedelta from io import BytesIO from xml.dom import minidom from zipfile import ZipFile, ZIP_DEFLATED @@ -12,8 +12,8 @@ from botocore.exceptions import ClientError -need_update_zip_state = False logger = logging.getLogger(__name__) +br_timezone = timedelta(hours=-3) def create_aggregates(database:DatabaseInterface, storage:StorageInterface): @@ -21,9 +21,22 @@ def create_aggregates(database:DatabaseInterface, storage:StorageInterface): Create xml for all territories available in database """ logger.info("Agregando os arquivos TXT para XML de territórios e estados...") - - results_query_states = list(database.select(f"SELECT t.state_code AS code, json_agg(json_build_object('id',t.id, 'name',t.name)) FROM territories t GROUP BY code;")) + results_query_states = list(database.select("""SELECT + t.state_code AS code, + json_agg(json_build_object('id',t.id, 'name',t.name)) + FROM + territories t + WHERE + t.id in (SELECT DISTINCT + territory_id + FROM + gazettes + ) + GROUP BY + code + """ + )) for state, territories_list in results_query_states: try: @@ -50,26 +63,25 @@ def create_aggregates_for_territories_and_states(territories_list:list, state:st gazettes g WHERE g.territory_id='{territory['id']}' + and g.processed=true GROUP BY year """ - )) - + )) for year, list_gazzetes_content in query_content_gazzetes_for_territory: year = str(int(year)) meta_xml = xml_content_generate(state, year, territory, list_gazzetes_content, storage) - if year not in xml_files_dict: - xml_files_dict[year] = [] - - xml_files_dict[year].append(meta_xml) + xml_files_dict.setdefault(year, []).append(meta_xml) - territory_slug = get_territory_slug(territory['name'], state) + territory_slug = get_territory_slug(meta_xml['territory_name'], state) zip_path = f"aggregates/{meta_xml['state_code']}/{territory_slug}_{meta_xml['territory_id']}_{meta_xml['year']}.zip" hx = hash_content(meta_xml['xml']) + logger.debug(f"Content hash for xml file of {zip_path}: {hx}") + need_update_territory_zip = zip_needs_upsert(hx, zip_path, database) if need_update_territory_zip: @@ -97,7 +109,8 @@ def create_zip_for_state(xmls_years_dict:dict, arr_years_update:list, state_code with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: for xml_file in xmls: - zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml']) + territory_slug = get_territory_slug(xml_file['territory_name'], xml_file['state_code']) + zip_file.writestr(f"{territory_slug}_{xml_file['territory_id']}_{xml_file['year']}.xml", xml_file['xml']) zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) zip_buffer.seek(0) @@ -107,6 +120,8 @@ def create_zip_for_state(xmls_years_dict:dict, arr_years_update:list, state_code hx = hash_content(zip_buffer_copy.read()) + logger.debug(f"Content hash for {zip_path}: {hx}") + dict_query_info = { "state_code" : state_code, "territory_id" : None, @@ -114,6 +129,7 @@ def create_zip_for_state(xmls_years_dict:dict, arr_years_update:list, state_code "year": year, "hash_info": hx, "file_size_mb": zip_size, + "last_updated": datetime.utcnow() + br_timezone, } database.insert("INSERT INTO aggregates \ @@ -121,10 +137,10 @@ def create_zip_for_state(xmls_years_dict:dict, arr_years_update:list, state_code file_size_mb, hash_info, last_updated) \ VALUES (%(territory_id)s, %(state_code)s, \ %(year)s, %(file_path)s, %(file_size_mb)s, \ - %(hash_info)s, NOW()) \ + %(hash_info)s, %(last_updated)s) \ ON CONFLICT(file_path) \ DO UPDATE \ - SET state_code = EXCLUDED.state_code, last_updated=NOW(), \ + SET state_code = EXCLUDED.state_code, last_updated=EXCLUDED.last_updated, \ hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) zip_buffer.close() @@ -135,12 +151,13 @@ def create_zip_for_territory(hx:str, zip_path:str, xml_file:dict, database:Datab Creating .zip files for the year's territories """ - logger.info(f"Gerando ZIP do municipio {xml_file['territory_id']} no ano {xml_file['year']}") + logger.info(f"Gerando ZIP do municipio {xml_file['territory_name']}-{xml_file['territory_id']} no ano {xml_file['year']}") zip_buffer = BytesIO() with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: - zip_file.writestr(f"{xml_file['territory_id']}-{xml_file['year']}.xml", xml_file['xml']) + territory_slug = get_territory_slug(xml_file['territory_name'], xml_file['state_code']) + zip_file.writestr(f"{territory_slug}_{xml_file['territory_id']}_{xml_file['year']}.xml", xml_file['xml']) zip_size = round(zip_buffer.tell() / (1024 * 1024), 2) zip_buffer.seek(0) @@ -159,6 +176,7 @@ def create_zip_for_territory(hx:str, zip_path:str, xml_file:dict, database:Datab "year": xml_file['year'], "hash_info": hx, "file_size_mb": zip_size, + "last_updated": datetime.utcnow() + br_timezone, } database.insert("INSERT INTO aggregates \ @@ -166,10 +184,10 @@ def create_zip_for_territory(hx:str, zip_path:str, xml_file:dict, database:Datab file_size_mb, hash_info, last_updated) \ VALUES (%(territory_id)s, %(state_code)s, \ %(year)s, %(file_path)s, %(file_size_mb)s, \ - %(hash_info)s, NOW()) \ + %(hash_info)s, %(last_updated)s) \ ON CONFLICT(file_path) \ DO UPDATE \ - SET state_code=EXCLUDED.state_code, last_updated=NOW(), \ + SET state_code=EXCLUDED.state_code, last_updated=EXCLUDED.last_updated, \ hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) zip_buffer.close() @@ -185,7 +203,6 @@ def xml_content_generate(state:str, year:str, territory:dict, list_gazzetes_cont logger.info(f"Gerando XML para cidade {territory['name']}-{state} no ano {year}") - meta_info_tag = ET.SubElement(root, "meta") ET.SubElement(meta_info_tag, "uf").text = state ET.SubElement(meta_info_tag, "ano_publicacao").text = str(year) @@ -200,13 +217,13 @@ def xml_content_generate(state:str, year:str, territory:dict, list_gazzetes_cont try: storage.get_file(path_arq_bucket, file_gazette_txt) except ClientError as e: - logger.error(f"Erro na obtenção do conteúdo de texto do diário do territorio {gazette['territory_id']}: {e}") + logger.warning(f"Erro na obtenção do conteúdo de texto do diário do territorio {path_arq_bucket}: {e}") file_gazette_txt.close() continue gazette_tag = ET.SubElement(all_gazettes_tag, "diario") - meta_gazette = ET.SubElement(gazette_tag, "meta-diario") + meta_gazette = ET.SubElement(gazette_tag, "meta_diario") ET.SubElement(meta_gazette, "url_arquivo_original").text = gazette['file_url'] ET.SubElement(meta_gazette, "poder").text = gazette['power'] ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette['is_extra_edition'] else 'Não' @@ -227,8 +244,9 @@ def xml_content_generate(state:str, year:str, territory:dict, list_gazzetes_cont data = { "xml":xml_file.getvalue(), "territory_id":territory['id'], + "territory_name":territory['name'], "state_code":state, - "year":year + "year":year, } xml_file.close() diff --git a/tasks/utils/hash.py b/tasks/utils/hash.py index e4ef927..f85536b 100644 --- a/tasks/utils/hash.py +++ b/tasks/utils/hash.py @@ -1,7 +1,5 @@ import hashlib, os -import logging -logger = logging.getLogger(__name__) def hash_content(content: bytes) -> str: """ @@ -10,6 +8,4 @@ def hash_content(content: bytes) -> str: result_hash = hashlib.sha256(content).hexdigest() - logger.info(f"Hash: {result_hash}") - return result_hash \ No newline at end of file diff --git a/tasks/utils/need_upsert.py b/tasks/utils/need_upsert.py index 6f40787..5cc6a6b 100644 --- a/tasks/utils/need_upsert.py +++ b/tasks/utils/need_upsert.py @@ -7,12 +7,12 @@ def zip_needs_upsert(hx: Union[str, bytes], zip_path:str, database:DatabaseInter Verifies if zip need an upsert to the database (update or insert) """ - need_update = True + needs_update_or_inexists = True query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ WHERE file_path='{zip_path}';")) if query_existing_aggregate: - need_update = hx != query_existing_aggregate[0][0] + needs_update_or_inexists = hx != query_existing_aggregate[0][0] - return need_update \ No newline at end of file + return needs_update_or_inexists \ No newline at end of file