Skip to content

Commit

Permalink
feat: add success flow and some utils/base class modifications
Browse files Browse the repository at this point in the history
  • Loading branch information
jeantozzi committed Jun 23, 2024
1 parent 1b6db14 commit 5669daa
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 17 deletions.
8 changes: 4 additions & 4 deletions pipelines/utils/crawler_datasus/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@
with Flow(name="DATASUS-SIM", code_owners=["jeantozzi"]) as flow_simsus:
# Parameters
dataset_id = Parameter("dataset_id", default="br_ms_sim", required=False)
table_id = Parameter("table_id", default = 'microdados_teste', required=False)
table_id = Parameter("table_id", default = "microdados_teste", required=False)
year_first_two_digits = Parameter("year_first_two_digits", required=False)
update_metadata = Parameter("update_metadata", default=True, required=False)
year_month_to_extract = Parameter("year_month_to_extract",default='', required=False)
Expand Down Expand Up @@ -390,9 +390,9 @@
with case(is_empty(ftp_files), False):

dbc_files = access_ftp_download_files_async(
file_list=ftp_files,
dataset_id=dataset_id,
table_id=table_id,
file_list=ftp_files,
dataset_id=dataset_id,
table_id=table_id,
)

dbf_files = decompress_dbc(
Expand Down
29 changes: 21 additions & 8 deletions pipelines/utils/crawler_datasus/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pipelines.utils.crawler_datasus.utils import (
download_chunks,
list_datasus_dbc_files,
year_sigla_uf_parser,
year_month_sigla_uf_parser,
dbf_to_parquet,
post_process_dados_complementares,
Expand Down Expand Up @@ -98,7 +99,8 @@ def check_files_to_parse(
datasus_database=datasus_database, datasus_database_table=datasus_database_table
)
if dataset_id == "br_ms_sim":
list_files = [file for file in available_dbs if file.split('/')[-1][4:8] == str(last_date.year + 1)]
list_files = [file for file in available_dbs if file.split('/')[-1][4:8] == str(last_date.year + 1)
and "BR" not in file] # Ignores the files that consolidate data at the BR level
else:
if len(year_month_to_extract) == 0:
list_files = [file for file in available_dbs if file.split('/')[-1][4:8] == year_month_to_parse]
Expand Down Expand Up @@ -135,25 +137,26 @@ def access_ftp_download_files_async(
os.makedirs(input_path, exist_ok=True)

# https://github.com/AlertaDengue/PySUS/blob/main/pysus/ftp/__init__.py#L156
log(f"------ dowloading {table_id} files from DATASUS FTP")

log(f"------ downloading {table_id} files from DATASUS FTP")

# Determine the appropriate parser based on the dataset_id
parser = year_sigla_uf_parser if dataset_id == "br_ms_sim" else year_month_sigla_uf_parser

asyncio.run(
download_chunks(
files=file_list,
output_dir=input_path,
max_parallel=max_parallel,
chunk_size=chunk_size,
parser=parser
)
)

# Generate the dbc_files_path_list using the chosen parser
dbc_files_path_list = [
os.path.join(
input_path, year_month_sigla_uf_parser(file=file), file.split("/")[-1]
)
os.path.join(input_path, parser(file=file), file.split("/")[-1])
for file in tqdm(file_list)
]
]

return dbc_files_path_list

Expand All @@ -164,6 +167,14 @@ def decompress_dbc(file_list: list, dataset_id: str) -> None:
Convert dbc to dbf format
"""

# ShellTask to remove the blast-dbf directory if it already exists
remove_existing_dir_task = ShellTask(
name="Remove Existing blast-dbf Directory",
command=f"rm -rf /tmp/{dataset_id}/blast-dbf",
return_all=True,
log_stderr=True,
)

# ShellTask to create the blast-dbf directory
create_dir_task = ShellTask(
name="Create blast-dbf Directory",
Expand All @@ -188,6 +199,8 @@ def decompress_dbc(file_list: list, dataset_id: str) -> None:
log_stderr=True,
)

remove_existing_dir_task.run()
log("------ Removing blast-dbf repository, in case it already exists")
create_dir_task.run()
log("------ Cloning blast-dbf repository")
clone_repo_task.run()
Expand Down Expand Up @@ -340,7 +353,7 @@ def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str=

dbf_file_list = [file.replace(".dbc", ".dbf") for file in file_list]
_counter = 0
log(f'----coutner {_counter}')
log(f'----counter {_counter}')
for file in tqdm(dbf_file_list):


Expand Down
34 changes: 29 additions & 5 deletions pipelines/utils/crawler_datasus/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import aioftp
import pandas as pd
import os
from typing import List, Dict, Tuple
from typing import List, Dict, Tuple, Callable
from pipelines.utils.crawler_datasus.constants import constants as datasus_constants
from pipelines.utils.utils import log

Expand Down Expand Up @@ -133,7 +133,7 @@ def list_datasus_dbc_files(
ftp.close()
return available_dbs

async def download_chunks(files: List[str], output_dir: str, chunk_size: int, max_parallel: int):
async def download_chunks(files: List[str], output_dir: str, chunk_size: int, max_parallel: int, parser: Callable[[str], str]):
"""This function is used to sequentially control the number of concurrent downloads to avoid
#OSError: [Errno 24] Too many open files
Expand All @@ -146,10 +146,10 @@ async def download_chunks(files: List[str], output_dir: str, chunk_size: int, ma

for i in range(0, len(files), chunk_size):
chunk = files[i:i + chunk_size]
await download_files(files=chunk, output_dir=output_dir, max_parallel=max_parallel)
await download_files(files=chunk, output_dir=output_dir, max_parallel=max_parallel, parser=parser)


async def download_files(files: list, output_dir: str, max_parallel: int)-> None:
async def download_files(files: list, output_dir: str, max_parallel: int, parser: Callable[[str], str])-> None:
"""Download files asynchronously and save them in the given output directory
Args:
Expand All @@ -161,7 +161,7 @@ async def download_files(files: list, output_dir: str, max_parallel: int)-> None
semaphore = asyncio.Semaphore(max_parallel)
async with semaphore:
tasks = [
download_file(file, f"{output_dir}/{year_month_sigla_uf_parser(file=file)}")
download_file(file, f"{output_dir}/{parser(file=file)}")
for file in files
]
await asyncio.gather(*tasks)
Expand Down Expand Up @@ -214,6 +214,30 @@ def line_file_parser(file_line)-> Tuple[List, Dict]:

return name, info

def year_sigla_uf_parser(file: str) -> str:
"""Receives a DATASUS MS SIM file and parses year and sigla_uf
to create proper partitions.
Args:
file (str): The file path.
Returns:
str: Partition string in the format "ano={year}/sigla_uf={sigla_uf}".
"""
try:
# Extract the file name without the path and extension
file_name = os.path.splitext(os.path.basename(file))[0]

# Parse year (last 4 digits)
year = file_name[-4:]

# Parse and build state (2 characters before the last 4 digits)
sigla_uf = file_name[-6:-4]

except IndexError:
raise ValueError("Unable to parse year or sigla_uf from file")

return f"ano={year}/sigla_uf={sigla_uf}"

def year_month_sigla_uf_parser(file: str) -> str:
"""receives a DATASUS CNES ST file and parse year, month and sigla_uf
Expand Down

0 comments on commit 5669daa

Please sign in to comment.