Skip to content

Commit

Permalink
Merge branch 'main' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Aug 30, 2024
2 parents 10c7c88 + b1fb895 commit b864f5c
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 0 deletions.
1 change: 1 addition & 0 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@
from pipelines.datasets.br_cnj_improbidade_administrativa.flows import *
from pipelines.datasets.br_ms_sih.flows import *
from pipelines.datasets.br_ms_sinan.flows import *
from pipelines.datasets.br_cgu_emendas_parlamentares.flows import *
17 changes: 17 additions & 0 deletions pipelines/datasets/br_cgu_emendas_parlamentares/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
from enum import Enum

import numpy as np


class constants(Enum): # pylint: disable=c0103
URL = "https://portaldatransparencia.gov.br/download-de-dados/emendas-parlamentares/UNICO"
INPUT = "/tmp/input/"
OUTPUT = "/tmp/output/"
VALUES_FLOAT = ['Valor Empenhado', 'Valor Liquidado', 'Valor Pago', 'Valor Restos A Pagar Inscritos', 'Valor Restos A Pagar Cancelados', 'Valor Restos A Pagar Pagos']
QUERY = """SELECT
(SELECT count(*) as total FROM `basedosdados.br_cgu_emendas_parlamentares.microdados`) AS total,
(SELECT TIMESTAMP_MILLIS(creation_time) as last_modified_time
FROM `basedosdados.br_cgu_emendas_parlamentares.__TABLES_SUMMARY__`
WHERE table_id = 'microdados') AS last_modified_time;
"""
110 changes: 110 additions & 0 deletions pipelines/datasets/br_cgu_emendas_parlamentares/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
from datetime import timedelta
from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from pipelines.constants import constants
from pipelines.datasets.br_cgu_emendas_parlamentares.schedules import (
every_day_emendas_parlamentares
)
from pipelines.datasets.br_cgu_emendas_parlamentares.tasks import (
convert_str_to_float,
get_last_modified_time
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import (
update_django_metadata,
check_if_data_is_outdated,
)
from pipelines.utils.tasks import ( # update_django_metadata,
create_table_and_upload_to_gcs,
get_current_flow_labels,
rename_current_flow_run_dataset_table,
)


with Flow(
name="br_cgu_emendas_parlamentares.microdados",
code_owners=[
"tricktx",
],
) as br_cgu_emendas_parlamentares_flow:
dataset_id = Parameter("dataset_id", default="br_cgu_emendas_parlamentares", required=False)
table_id = Parameter("table_id", default="microdados", required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)
materialization_mode = Parameter("materialization_mode", default="dev", required=False)
materialize_after_dump = Parameter("materialize_after_dump", default=False, required=False)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id)

max_modified_time = get_last_modified_time()

outdated = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
date_type="last_update_date",
data_source_max_date=max_modified_time,
upstream_tasks=[max_modified_time],
)

with case(outdated, True):
output_path = convert_str_to_float()
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_path,
upstream_tasks=[output_path],
)


with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
"disable_elementary": False,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks = [wait_upload_table]
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_format="%Y",
coverage_type="all_bdpro",
prefect_mode=materialization_mode,
historical_database=False,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)

br_cgu_emendas_parlamentares_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_emendas_parlamentares_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_emendas_parlamentares_flow.schedule = every_day_emendas_parlamentares
27 changes: 27 additions & 0 deletions pipelines/datasets/br_cgu_emendas_parlamentares/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
from datetime import datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock, IntervalClock

from pipelines.constants import constants

every_day_emendas_parlamentares = Schedule(
clocks=[
CronClock(
cron="30 19 * * *", # At 19:30 every day
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_emenda_parlamentar",
"table_id": "emenda_parlamentar",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"update_metadata": True,
},
),
],
)
58 changes: 58 additions & 0 deletions pipelines/datasets/br_cgu_emendas_parlamentares/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
from urllib.request import urlopen
from datetime import datetime
import zipfile
import basedosdados as bd
import pandas as pd
from io import BytesIO
from pipelines.utils.utils import log
from pipelines.datasets.br_cgu_emendas_parlamentares.constants import constants
from prefect import task
import os

def download_unzip_file():
if not os.path.exists(constants.INPUT.value):
os.mkdir(constants.INPUT.value)
try:
r = urlopen(constants.URL.value)
zip = zipfile.ZipFile(BytesIO(r.read()))
zip.extractall(path=constants.INPUT.value)
except Exception as e:
print(e)
log("Erro ao baixar e descompactar arquivo")

@task
def convert_str_to_float():
df = pd.read_csv(f"{constants.INPUT.value}Emendas.csv", sep=';', encoding='latin1')
log("Convertendo valores para float")

for _ in constants.VALUES_FLOAT.value:
df[_] = df[_].str.replace(',', '.').astype(float)

output = f"{constants.OUTPUT.value}microdados.csv"

if not os.path.exists(constants.OUTPUT.value):
os.mkdir(constants.OUTPUT.value)

df.to_csv(output, sep=',', encoding='utf-8', index=False)
log("---------------- Tabela salva -------------------")
return constants.OUTPUT.value

@task
def get_last_modified_time():
download_unzip_file()
emendas = pd.read_csv(f"{constants.INPUT.value}Emendas.csv", sep=';', encoding='latin1')

data = bd.read_sql(
constants.QUERY.value,
billing_project_id="basedosdados",
from_file=True )

date = data.iloc[0].values
log("Data da última atualização: " + str(date[1]))
log("Quantidade de linhas na tabela: " + str(date[0]))
log("Quantidade de linhas no arquivo: " + str(emendas.shape[0]))
if emendas.shape[0] > date[0]:
return datetime.today()

return date[1]

0 comments on commit b864f5c

Please sign in to comment.