diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..fb1b1b1 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,20 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: check-added-large-files # prevents adding large files + - id: detect-private-key # detects private keys + - id: fix-byte-order-marker # fixes BOM + - id: fix-encoding-pragma # fixes encoding pragma + - id: no-commit-to-branch # prevents committing to protected branches + - id: trailing-whitespace # prevents trailing whitespace + - repo: https://github.com/python-poetry/poetry + rev: 1.8.0 + hooks: + - id: poetry-check + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.7 + hooks: + - id: ruff + args: [--fix] + - id: ruff-format \ No newline at end of file diff --git a/flows/brazilian_reits/brazilian_reits.py b/flows/brazilian_reits/brazilian_reits.py index 6771cf7..3fd1eb0 100644 --- a/flows/brazilian_reits/brazilian_reits.py +++ b/flows/brazilian_reits/brazilian_reits.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- from prefect import flow, task, get_run_logger import requests from bs4 import BeautifulSoup @@ -14,8 +15,10 @@ from google.cloud import bigquery from pathlib import Path from google.cloud.storage import Client, transfer_manager + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./dbt-service-account.json" + def create_bigquery_table(table_id, overwrite=True) -> None: """ Creates a BigQuery table with external Parquet data source. @@ -24,12 +27,12 @@ def create_bigquery_table(table_id, overwrite=True) -> None: table_id (str): The identifier of the table to be created in BigQuery. overwrite (bool, optional): If True, the existing table with the same `table_id` will be overwritten. If False and the table already exists, the function does nothing. Default is True. - """ - - source_uris = [ - f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/*" - ] - bq_table_id = f"arthur-data-engineering-course.brazilian_reits_staging.{table_id}_staging" + """ + + source_uris = [f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/*"] + bq_table_id = ( + f"arthur-data-engineering-course.brazilian_reits_staging.{table_id}_staging" + ) client = bigquery.Client() if overwrite: client.delete_table(bq_table_id, not_found_ok=True) # Make an API request. @@ -39,12 +42,14 @@ def create_bigquery_table(table_id, overwrite=True) -> None: external_config = bigquery.ExternalConfig(external_source_format) hive_partitioning = bigquery.HivePartitioningOptions() hive_partitioning.mode = "AUTO" - hive_partitioning.source_uri_prefix = f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/" + hive_partitioning.source_uri_prefix = ( + f"gs://brazilian-reits-bucket/br_cvm_fii/{table_id}/" + ) external_config.source_uris = source_uris external_config.hive_partitioning = hive_partitioning table = bigquery.Table(bq_table_id) table.external_data_configuration = external_config - table = client.create_table(table) + table = client.create_table(table) print( f"Created table with external source format {table.external_data_configuration.source_format}" ) @@ -88,17 +93,23 @@ def extract_links_and_dates(url) -> pd.DataFrame: df["desatualizado"] = df["data_hoje"] == df["ultima_atualizacao"] return df + @task def check_for_updates(df): """ Checks for outdated tables. """ - - return df.query("desatualizado == False").arquivo.to_list() + + return df.query("desatualizado == False").arquivo.to_list() + @task # noqa def download_unzip_csv( - files,url: str = "https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/", chunk_size: int = 128, mkdir: bool = True, id="raw", + files, + url: str = "https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/", + chunk_size: int = 128, + mkdir: bool = True, + id="raw", ) -> str: """ Downloads and unzips a .csv file from a given list of files and saves it to a local directory. @@ -154,14 +165,21 @@ def download_unzip_csv( return f"/tmp/data/br_cvm_fii/{id}/input/" + @task def make_partitions(path: str): logger = get_run_logger() files = os.listdir(path) - for i in ['geral', 'complemento', 'ativo_passivo']: os.makedirs(f"/tmp/data/br_cvm_fii/{i}", exist_ok=True) + for i in ["geral", "complemento", "ativo_passivo"]: + os.makedirs(f"/tmp/data/br_cvm_fii/{i}", exist_ok=True) ROOT_DIR = "/tmp/data/br_cvm_fii/" for file in files: - df = pd.read_csv(f'{ROOT_DIR}/raw/input/{file}', sep=";", encoding="ISO-8859-1", dtype="string") + df = pd.read_csv( + f"{ROOT_DIR}/raw/input/{file}", + sep=";", + encoding="ISO-8859-1", + dtype="string", + ) partition_name = None if "_geral_" in file: partition_name = "geral" @@ -173,18 +191,27 @@ def make_partitions(path: str): if partition_name: pq.write_to_dataset( table=pa.Table.from_pandas(df), - root_path=f'{ROOT_DIR}{partition_name}', + root_path=f"{ROOT_DIR}{partition_name}", partition_cols=["Data_Referencia"], - basename_template="{i}_data.parquet" - ) + basename_template="{i}_data.parquet", + ) + @task def make_parquet_schema(): - for t in ['geral','complemento', 'ativo_passivo']: - table = pq.read_table(f'/tmp/data/br_cvm_fii/{t}/Data_Referencia=2019-12-01/0_data.parquet') + for t in ["geral", "complemento", "ativo_passivo"]: + table = pq.read_table( + f"/tmp/data/br_cvm_fii/{t}/Data_Referencia=2019-12-01/0_data.parquet" + ) schema = table.schema - schema_df = pd.DataFrame({'column_name': schema.names, 'data_type': [str(dtype) for dtype in schema.types]}) - schema_df.to_csv(f'/tmp/data/br_cvm_fii/schema_{t}.csv', index=False) + schema_df = pd.DataFrame( + { + "column_name": schema.names, + "data_type": [str(dtype) for dtype in schema.types], + } + ) + schema_df.to_csv(f"/tmp/data/br_cvm_fii/schema_{t}.csv", index=False) + @task def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=8): @@ -208,24 +235,30 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, worker relative_paths = [path.relative_to(source_directory) for path in file_paths] string_paths = [str(path) for path in relative_paths] - + logger.info("Found {} files.".format(len(string_paths))) results = transfer_manager.upload_many_from_filenames( - bucket, string_paths, source_directory=source_directory, max_workers=workers, skip_if_exists=True + bucket, + string_paths, + source_directory=source_directory, + max_workers=workers, + skip_if_exists=True, ) for name, result in zip(string_paths, results): if isinstance(result, Exception): logger.info("Failed to upload {} due to exception: {}".format(name, result)) + @task def create_staging_tables(): - for table in ['geral','complemento', 'ativo_passivo']: + for table in ["geral", "complemento", "ativo_passivo"]: logger = get_run_logger() logger.info(f"Criando tabela {table}:") create_bigquery_table(table_id=table) logger.info(f"Tabela {table} criada!") + @task def run_dbt_model(): logger = get_run_logger() @@ -233,22 +266,30 @@ def run_dbt_model(): result = DbtCoreOperation( commands=["dbt run"], project_dir="/opt/prefect/prefect-cloud-run-poc/queries", - profiles_dir="/opt/prefect/prefect-cloud-run-poc/queries" + profiles_dir="/opt/prefect/prefect-cloud-run-poc/queries", ).run() + return result + @flow() def brazilian_reits(): logger = get_run_logger() - links_and_dates = extract_links_and_dates(url= "https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/") - files = check_for_updates(df = links_and_dates) + links_and_dates = extract_links_and_dates( + url="https://dados.cvm.gov.br/dados/FII/DOC/INF_MENSAL/DADOS/" + ) + files = check_for_updates(df=links_and_dates) logger.info(f" tamanho --> {len(files)}") if len(files) == 0: logger.info("Não houveram atualizações, encerrando o flow") else: - path = download_unzip_csv(files = files) - make_partitions(path = path) - upload_directory_with_transfer_manager(bucket_name="brazilian-reits-bucket", source_directory="/tmp/data/") + path = download_unzip_csv(files=files) + make_partitions(path=path) + upload_directory_with_transfer_manager( + bucket_name="brazilian-reits-bucket", source_directory="/tmp/data/" + ) create_staging_tables() run_dbt_model() + + if __name__ == "__main__": - brazilian_reits() \ No newline at end of file + brazilian_reits() diff --git a/poetry.lock b/poetry.lock index 52856d0..b421500 100644 --- a/poetry.lock +++ b/poetry.lock @@ -443,6 +443,17 @@ files = [ [package.dependencies] pycparser = "*" +[[package]] +name = "cfgv" +version = "3.4.0" +description = "Validate configuration and produce human readable error messages." +optional = false +python-versions = ">=3.8" +files = [ + {file = "cfgv-3.4.0-py2.py3-none-any.whl", hash = "sha256:b7265b1f29fd3316bfcd2b330d63d024f2bfd8bcb8b0272f8e19a504856c48f9"}, + {file = "cfgv-3.4.0.tar.gz", hash = "sha256:e52591d4c5f5dead8e0f673fb16db7949d2cfb3f7da4582893288f0ded8fe560"}, +] + [[package]] name = "charset-normalizer" version = "3.3.2" @@ -809,6 +820,17 @@ files = [ {file = "decorator-5.1.1.tar.gz", hash = "sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330"}, ] +[[package]] +name = "distlib" +version = "0.3.8" +description = "Distribution utilities" +optional = false +python-versions = "*" +files = [ + {file = "distlib-0.3.8-py2.py3-none-any.whl", hash = "sha256:034db59a0b96f8ca18035f36290806a9a6e6bd9d1ff91e45a7f172eb17e51784"}, + {file = "distlib-0.3.8.tar.gz", hash = "sha256:1530ea13e350031b6312d8580ddb6b27a104275a31106523b8f123787f494f64"}, +] + [[package]] name = "dnspython" version = "2.6.1" @@ -879,6 +901,22 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "filelock" +version = "3.13.4" +description = "A platform independent file lock." +optional = false +python-versions = ">=3.8" +files = [ + {file = "filelock-3.13.4-py3-none-any.whl", hash = "sha256:404e5e9253aa60ad457cae1be07c0f0ca90a63931200a47d9b6a6af84fd7b45f"}, + {file = "filelock-3.13.4.tar.gz", hash = "sha256:d13f466618bfde72bd2c18255e269f72542c6e70e7bac83a0232d6b1cc5c8cf4"}, +] + +[package.extras] +docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"] +testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"] +typing = ["typing-extensions (>=4.8)"] + [[package]] name = "frozenlist" version = "1.4.1" @@ -1679,6 +1717,20 @@ files = [ {file = "hyperframe-6.0.1.tar.gz", hash = "sha256:ae510046231dc8e9ecb1a6586f63d2347bf4c8905914aa84ba585ae85f28a914"}, ] +[[package]] +name = "identify" +version = "2.5.35" +description = "File identification library for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "identify-2.5.35-py2.py3-none-any.whl", hash = "sha256:c4de0081837b211594f8e877a6b4fad7ca32bbfc1a9307fdd61c28bfe923f13e"}, + {file = "identify-2.5.35.tar.gz", hash = "sha256:10a7ca245cfcd756a554a7288159f72ff105ad233c7c4b9c6f0f4d108f5f6791"}, +] + +[package.extras] +license = ["ukkonen"] + [[package]] name = "idna" version = "3.6" @@ -2261,6 +2313,20 @@ doc = ["myst-nb (>=1.0)", "numpydoc (>=1.7)", "pillow (>=9.4)", "pydata-sphinx-t extra = ["lxml (>=4.6)", "pydot (>=2.0)", "pygraphviz (>=1.12)", "sympy (>=1.10)"] test = ["pytest (>=7.2)", "pytest-cov (>=4.0)"] +[[package]] +name = "nodeenv" +version = "1.8.0" +description = "Node.js virtual environment builder" +optional = false +python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*" +files = [ + {file = "nodeenv-1.8.0-py2.py3-none-any.whl", hash = "sha256:df865724bb3c3adc86b3876fa209771517b0cfe596beff01a92700e0e8be4cec"}, + {file = "nodeenv-1.8.0.tar.gz", hash = "sha256:d51e0c37e64fbf47d017feac3145cdbb58836d7eee8c6f6d3b6880c5456227d2"}, +] + +[package.dependencies] +setuptools = "*" + [[package]] name = "numpy" version = "1.26.4" @@ -2621,6 +2687,39 @@ tzdata = ">=2020.1" [package.extras] test = ["time-machine (>=2.6.0)"] +[[package]] +name = "platformdirs" +version = "4.2.0" +description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." +optional = false +python-versions = ">=3.8" +files = [ + {file = "platformdirs-4.2.0-py3-none-any.whl", hash = "sha256:0614df2a2f37e1a662acbd8e2b25b92ccf8632929bc6d43467e17fe89c75e068"}, + {file = "platformdirs-4.2.0.tar.gz", hash = "sha256:ef0cc731df711022c174543cb70a9b5bd22e5a9337c8624ef2c2ceb8ddad8768"}, +] + +[package.extras] +docs = ["furo (>=2023.9.10)", "proselint (>=0.13)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"] +test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)"] + +[[package]] +name = "pre-commit" +version = "3.7.0" +description = "A framework for managing and maintaining multi-language pre-commit hooks." +optional = false +python-versions = ">=3.9" +files = [ + {file = "pre_commit-3.7.0-py2.py3-none-any.whl", hash = "sha256:5eae9e10c2b5ac51577c3452ec0a490455c45a0533f7960f993a0d01e59decab"}, + {file = "pre_commit-3.7.0.tar.gz", hash = "sha256:e209d61b8acdcf742404408531f0c37d49d2c734fd7cff2d6076083d191cb060"}, +] + +[package.dependencies] +cfgv = ">=2.0.0" +identify = ">=1.0.0" +nodeenv = ">=0.11.1" +pyyaml = ">=5.1" +virtualenv = ">=20.10.0" + [[package]] name = "prefect" version = "2.16.9" @@ -3995,6 +4094,26 @@ typing-extensions = {version = ">=4.0", markers = "python_version < \"3.11\""} [package.extras] standard = ["colorama (>=0.4)", "httptools (>=0.5.0)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"] +[[package]] +name = "virtualenv" +version = "20.25.1" +description = "Virtual Python Environment builder" +optional = false +python-versions = ">=3.7" +files = [ + {file = "virtualenv-20.25.1-py3-none-any.whl", hash = "sha256:961c026ac520bac5f69acb8ea063e8a4f071bcc9457b9c1f28f6b085c511583a"}, + {file = "virtualenv-20.25.1.tar.gz", hash = "sha256:e08e13ecdca7a0bd53798f356d5831434afa5b07b93f0abdf0797b7a06ffe197"}, +] + +[package.dependencies] +distlib = ">=0.3.7,<1" +filelock = ">=3.12.2,<4" +platformdirs = ">=3.9.1,<5" + +[package.extras] +docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"] +test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"] + [[package]] name = "websocket-client" version = "1.7.0" @@ -4213,4 +4332,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "f99713c9c5a714f195a8c5631e4ac4dd159186e61a030e5dd5abf3146942acb6" +content-hash = "c6306fb03c59dc39e8d2a3a1e0825ce180d604acc4661c1bb6d4bd17ac6dfcb8" diff --git a/pyproject.toml b/pyproject.toml index 8c14b28..8576708 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dbt-bigquery = "^1.7.7" prefect-dbt = {extras = ["bigquery"], version = "^0.4.1"} beautifulsoup4 = "^4.12.3" tqdm = "^4.66.2" +pre-commit = "^3.7.0" [build-system] requires = ["poetry-core"]