Skip to content

Commit

Permalink
feat: add DATASUS-SIM flow
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurfg committed May 13, 2024
1 parent bea62cb commit 88b164d
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 9 deletions.
7 changes: 5 additions & 2 deletions pipelines/utils/crawler_datasus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class constants(Enum): # pylint: disable=c0103
DATASUS_DATABASE = {
"br_ms_cnes": "CNES",
"br_ms_sia": "SIA",
"br_ms_sih": "SIH"
"br_ms_sih": "SIH",
"br_ms_sim": "SIM"
}

DATASUS_DATABASE_TABLE = {
Expand All @@ -45,7 +46,9 @@ class constants(Enum): # pylint: disable=c0103
"psicossocial": "PS",
#SIH
"servicos_profissionais": "SP",
"aihs_reduzidas": "RD,"
"aihs_reduzidas": "RD",
#SIM
"microdados_teste": "DO",
}


Expand Down
107 changes: 107 additions & 0 deletions pipelines/utils/crawler_datasus/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,110 @@
)
flow_sihsus.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_sihsus.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)


with Flow(name="DATASUS-SIM", code_owners=["jeantozzi"]) as flow_simsus:
# Parameters
dataset_id = Parameter("dataset_id", default="br_ms_sim", required=False)
table_id = Parameter("table_id", default = 'microdados_teste', required=False)
year_first_two_digits = Parameter("year_first_two_digits", required=False)
update_metadata = Parameter("update_metadata", default=True, required=False)
year_month_to_extract = Parameter("year_month_to_extract",default='', required=False)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=False, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

ftp_files = check_files_to_parse(
dataset_id=dataset_id,
table_id=table_id,
year_month_to_extract=year_month_to_extract,
)

with case(is_empty(ftp_files), True):
log_task(
"Os dados do FTP SIH ainda não foram atualizados para o ano/mes mais recente"
)

with case(is_empty(ftp_files), False):

dbc_files = access_ftp_download_files_async(
file_list=ftp_files,
dataset_id=dataset_id,
table_id=table_id,
)

dbf_files = decompress_dbc(
file_list=dbc_files, dataset_id=dataset_id, upstream_tasks=[dbc_files]
)


files_path = read_dbf_save_parquet_chunks(
file_list=dbc_files,
table_id=table_id,
dataset_id=dataset_id,
upstream_tasks=[dbc_files,dbf_files],
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=files_path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=files_path,
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
"disable_elementary": False,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"year": "ano", "month": "mes"},
date_format="%Y-%m",
coverage_type="part_bdpro",
time_delta={"months": 6},
prefect_mode=materialization_mode,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)
flow_simsus.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_simsus.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)


20 changes: 14 additions & 6 deletions pipelines/utils/crawler_datasus/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,19 @@ def check_files_to_parse(
) -> list[str]:
log(f"------- Extracting last date from api for {dataset_id}.{table_id}")
# 1. extrair data mais atual da api
last_date = get_api_most_recent_date(
dataset_id=dataset_id,
table_id=table_id,
date_format="%Y-%m",
)
if dataset_id == 'br_ms_sim':
last_date = get_api_most_recent_date(
dataset_id=dataset_id,
table_id=table_id,
date_format="%Y",
)
else:
last_date = get_api_most_recent_date(
dataset_id=dataset_id,
table_id=table_id,
date_format="%Y-%m",
)


log("------- Building next year/month to parse")

Expand Down Expand Up @@ -89,7 +97,7 @@ def check_files_to_parse(
available_dbs = list_datasus_dbc_files(
datasus_database=datasus_database, datasus_database_table=datasus_database_table
)
#

if len(year_month_to_extract) == 0:
list_files = [file for file in available_dbs if file.split('/')[-1][4:8] == year_month_to_parse]
else:
Expand Down
5 changes: 5 additions & 0 deletions pipelines/utils/crawler_datasus/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ def list_datasus_dbc_files(
raise ValueError("No group assigned to SIH_group")
available_dbs.extend(ftp.nlst(f"dissemin/publicos/SIHSUS/200801_/Dados/{datasus_database_table}*.DBC"))

if datasus_database == "SIM":
if not datasus_database_table:
raise ValueError("No group assigned to SIH_group")
available_dbs.extend(ftp.nlst(f"dissemin/publicos/SIM/CID10/DORES/{datasus_database_table}*.DBC"))

except Exception as e:
raise e

Expand Down
2 changes: 1 addition & 1 deletion pipelines/utils/metadata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ def format_and_check_date(date_values: tuple, date_format: str) -> str:
if date_format == "%Y-%m" and end_year is not None and end_month is not None:
return f"{end_year:04d}-{end_month:02d}"

if date_format == "Y%" and end_year is not None:
if date_format == "%Y" and end_year is not None:
return f"{end_year:04d}"

raise ValueError(
Expand Down

0 comments on commit 88b164d

Please sign in to comment.