From 3ba0edf25a1004f83351f3b5a8d75200b75d1514 Mon Sep 17 00:00:00 2001 From: JosephKevinMachado Date: Thu, 13 Jun 2024 13:38:58 -0400 Subject: [PATCH] 2024-06-13-13-38-58 - add-tests --- containers/airflow/requirements.txt | 2 ++ dags/coincap_elt.py | 23 ++++++++++++++++++++++- visualization/dashboard.html | 4 ++-- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/containers/airflow/requirements.txt b/containers/airflow/requirements.txt index 30ae22f..1591433 100755 --- a/containers/airflow/requirements.txt +++ b/containers/airflow/requirements.txt @@ -11,3 +11,5 @@ duckdb==1.0.0 plotly==5.22.0 jupyter==1.0.0 types-requests==2.32.0.20240602 +cuallee==0.10.3 +polars==0.20.31 diff --git a/dags/coincap_elt.py b/dags/coincap_elt.py index 9dce84b..07b5ee6 100755 --- a/dags/coincap_elt.py +++ b/dags/coincap_elt.py @@ -7,6 +7,9 @@ from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator +from cuallee import Check, CheckLevel +import polars as pl +from airflow.operators.dummy import DummyOperator with DAG( 'coincap_elt', @@ -32,6 +35,23 @@ def fetch_coincap_exchanges(url, file_path): dict_writer.writeheader() dict_writer.writerows(exchanges) + def check_completeness(pl_df, column_name): + check = Check(CheckLevel.ERROR, "Completeness") + validation_results_df = ( + check.is_complete(column_name).validate(pl_df) + ) + return validation_results_df["status"].to_list() + + @task.branch + def check_data_quality(validation_results): + if "FAIL" not in validation_results: + return ['generate_dashboard'] + return ['stop_pipeline'] + + check_data_quality_instance = check_data_quality(check_completeness(pl.read_csv(file_path), "name")) + + stop_pipeline = DummyOperator(task_id='stop_pipeline') + markdown_path = f'{os.getenv("AIRFLOW_HOME")}/visualization/' q_cmd = ( f'cd {markdown_path} && quarto render {markdown_path}/dashboard.qmd' @@ -40,4 +60,5 @@ def fetch_coincap_exchanges(url, file_path): task_id="generate_dashboard", bash_command=q_cmd ) - fetch_coincap_exchanges(url, file_path) >> gen_dashboard + fetch_coincap_exchanges(url, file_path) >> check_data_quality_instance >> gen_dashboard + fetch_coincap_exchanges(url, file_path) >> check_data_quality_instance >> stop_pipeline diff --git a/visualization/dashboard.html b/visualization/dashboard.html index 0827b73..954bab7 100755 --- a/visualization/dashboard.html +++ b/visualization/dashboard.html @@ -100,9 +100,9 @@
Coincap Exchange data analysis
-