Skip to content

Commit

Permalink
Merge pull request #1 from arthurfg/feat/add-pre-commit
Browse files Browse the repository at this point in the history
feat: testing pre-commit hooks
  • Loading branch information
arthurfg authored Apr 16, 2024
2 parents 72debf1 + da1b3d5 commit 44847f2
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 31 deletions.
20 changes: 20 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: check-added-large-files # prevents adding large files
- id: detect-private-key # detects private keys
- id: fix-byte-order-marker # fixes BOM
- id: fix-encoding-pragma # fixes encoding pragma
- id: no-commit-to-branch # prevents committing to protected branches
- id: trailing-whitespace # prevents trailing whitespace
- repo: https://github.com/python-poetry/poetry
rev: 1.8.0
hooks:
- id: poetry-check
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.7
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
101 changes: 71 additions & 30 deletions flows/brazilian_reits/brazilian_reits.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
from prefect import flow, task, get_run_logger
import requests
from bs4 import BeautifulSoup
Expand All @@ -14,8 +15,10 @@
from google.cloud import bigquery
from pathlib import Path
from google.cloud.storage import Client, transfer_manager

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./dbt-service-account.json"


def create_bigquery_table(table_id, overwrite=True) -> None:
"""
Creates a BigQuery table with external Parquet data source.
Expand All @@ -24,12 +27,12 @@ def create_bigquery_table(table_id, overwrite=True) -> None:
table_id (str): The identifier of the table to be created in BigQuery.
overwrite (bool, optional): If True, the existing table with the same `table_id` will be overwritten.
If False and the table already exists, the function does nothing. Default is True.
"""
source_uris = [
f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/*"
]
bq_table_id = f"arthur-data-engineering-course.brazilian_reits_staging.{table_id}_staging"
"""

source_uris = [f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/*"]
bq_table_id = (
f"arthur-data-engineering-course.brazilian_reits_staging.{table_id}_staging"
)
client = bigquery.Client()
if overwrite:
client.delete_table(bq_table_id, not_found_ok=True) # Make an API request.
Expand All @@ -39,12 +42,14 @@ def create_bigquery_table(table_id, overwrite=True) -> None:
external_config = bigquery.ExternalConfig(external_source_format)
hive_partitioning = bigquery.HivePartitioningOptions()
hive_partitioning.mode = "AUTO"
hive_partitioning.source_uri_prefix = f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/"
hive_partitioning.source_uri_prefix = (
f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/"
)
external_config.source_uris = source_uris
external_config.hive_partitioning = hive_partitioning
table = bigquery.Table(bq_table_id)
table.external_data_configuration = external_config
table = client.create_table(table)
table = client.create_table(table)
print(
f"Created table with external source format {table.external_data_configuration.source_format}"
)
Expand Down Expand Up @@ -88,17 +93,23 @@ def extract_links_and_dates(url) -> pd.DataFrame:
df["desatualizado"] = df["data_hoje"] == df["ultima_atualizacao"]
return df


@task
def check_for_updates(df):
"""
Checks for outdated tables.
"""

return df.query("desatualizado == False").arquivo.to_list()

return df.query("desatualizado == False").arquivo.to_list()


@task # noqa
def download_unzip_csv(
files,url: str = "https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/", chunk_size: int = 128, mkdir: bool = True, id="raw",
files,
url: str = "https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/",
chunk_size: int = 128,
mkdir: bool = True,
id="raw",
) -> str:
"""
Downloads and unzips a .csv file from a given list of files and saves it to a local directory.
Expand Down Expand Up @@ -154,14 +165,21 @@ def download_unzip_csv(

return f"/tmp/data/br_cvm_fii/{id}/input/"


@task
def make_partitions(path: str):
logger = get_run_logger()
files = os.listdir(path)
for i in ['geral', 'complemento', 'ativo_passivo']: os.makedirs(f"/tmp/data/br_cvm_fii/{i}", exist_ok=True)
for i in ["geral", "complemento", "ativo_passivo"]:
os.makedirs(f"/tmp/data/br_cvm_fii/{i}", exist_ok=True)
ROOT_DIR = "/tmp/data/br_cvm_fii/"
for file in files:
df = pd.read_csv(f'{ROOT_DIR}/raw/input/{file}', sep=";", encoding="ISO-8859-1", dtype="string")
df = pd.read_csv(
f"{ROOT_DIR}/raw/input/{file}",
sep=";",
encoding="ISO-8859-1",
dtype="string",
)
partition_name = None
if "_geral_" in file:
partition_name = "geral"
Expand All @@ -173,18 +191,27 @@ def make_partitions(path: str):
if partition_name:
pq.write_to_dataset(
table=pa.Table.from_pandas(df),
root_path=f'{ROOT_DIR}{partition_name}',
root_path=f"{ROOT_DIR}{partition_name}",
partition_cols=["Data_Referencia"],
basename_template="{i}_data.parquet"
)
basename_template="{i}_data.parquet",
)


@task
def make_parquet_schema():
for t in ['geral','complemento', 'ativo_passivo']:
table = pq.read_table(f'/tmp/data/br_cvm_fii/{t}/Data_Referencia=2019-12-01/0_data.parquet')
for t in ["geral", "complemento", "ativo_passivo"]:
table = pq.read_table(
f"/tmp/data/br_cvm_fii/{t}/Data_Referencia=2019-12-01/0_data.parquet"
)
schema = table.schema
schema_df = pd.DataFrame({'column_name': schema.names, 'data_type': [str(dtype) for dtype in schema.types]})
schema_df.to_csv(f'/tmp/data/br_cvm_fii/schema_{t}.csv', index=False)
schema_df = pd.DataFrame(
{
"column_name": schema.names,
"data_type": [str(dtype) for dtype in schema.types],
}
)
schema_df.to_csv(f"/tmp/data/br_cvm_fii/schema_{t}.csv", index=False)


@task
def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=8):
Expand All @@ -208,47 +235,61 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, worker

relative_paths = [path.relative_to(source_directory) for path in file_paths]
string_paths = [str(path) for path in relative_paths]

logger.info("Found {} files.".format(len(string_paths)))
results = transfer_manager.upload_many_from_filenames(
bucket, string_paths, source_directory=source_directory, max_workers=workers, skip_if_exists=True
bucket,
string_paths,
source_directory=source_directory,
max_workers=workers,
skip_if_exists=True,
)

for name, result in zip(string_paths, results):
if isinstance(result, Exception):
logger.info("Failed to upload {} due to exception: {}".format(name, result))


@task
def create_staging_tables():
for table in ['geral','complemento', 'ativo_passivo']:
for table in ["geral", "complemento", "ativo_passivo"]:
logger = get_run_logger()
logger.info(f"Criando tabela {table}:")
create_bigquery_table(table_id=table)
logger.info(f"Tabela {table} criada!")


@task
def run_dbt_model():
logger = get_run_logger()
logger.info("Triggering dbt models:")
result = DbtCoreOperation(
commands=["dbt run"],
project_dir="/opt/prefect/prefect-cloud-run-poc/queries",
profiles_dir="/opt/prefect/prefect-cloud-run-poc/queries"
profiles_dir="/opt/prefect/prefect-cloud-run-poc/queries",
).run()
return result


@flow()
def brazilian_reits():
logger = get_run_logger()
links_and_dates = extract_links_and_dates(url= "https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/")
files = check_for_updates(df = links_and_dates)
links_and_dates = extract_links_and_dates(
url="https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/"
)
files = check_for_updates(df=links_and_dates)
logger.info(f" tamanho --> {len(files)}")
if len(files) == 0:
logger.info("Não houveram atualizações, encerrando o flow")
else:
path = download_unzip_csv(files = files)
make_partitions(path = path)
upload_directory_with_transfer_manager(bucket_name="brazilian-reits-bucket", source_directory="/tmp/data/")
path = download_unzip_csv(files=files)
make_partitions(path=path)
upload_directory_with_transfer_manager(
bucket_name="brazilian-reits-bucket", source_directory="/tmp/data/"
)
create_staging_tables()
run_dbt_model()


if __name__ == "__main__":
brazilian_reits()
brazilian_reits()
Loading

0 comments on commit 44847f2

Please sign in to comment.