From bc1dc3e203e223270f887b0aa29207c4b4136823 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 3 Jan 2025 14:59:03 -0500 Subject: [PATCH] Ruff ify (#139) --- .github/workflows/deploy-dagster-cloud.yml | 8 +- Makefile | 3 + hooli-bi/hooli_bi/__init__.py | 2 +- hooli-bi/hooli_bi/powerbi_assets.py | 53 +++++----- hooli-bi/hooli_bi/powerbi_workspace.py | 2 +- .../hooli_data_ingest/__init__.py | 2 +- .../hooli_data_ingest/assets/sling.py | 35 +++---- .../hooli_data_ingest/definitions.py | 23 ++--- .../hooli_data_ingest/jobs/__init__.py | 4 +- .../hooli_data_ingest/resources/__init__.py | 96 +++++++++---------- .../hooli_data_ingest/schedules/__init__.py | 4 +- hooli_basics/definitions.py | 48 ++++++---- .../dagster_batch_enrichment/__init__.py | 2 +- .../dagster_batch_enrichment/api.py | 5 +- .../dagster_batch_enrichment/assets.py | 53 ++++++---- .../dagster_batch_enrichment/definitions.py | 14 ++- .../dagster_batch_enrichment/warehouse.py | 6 +- hooli_data_eng/__init__.py | 2 +- hooli_data_eng/assets/__init__.py | 1 - hooli_data_eng/assets/dbt_assets.py | 19 ++-- hooli_data_eng/assets/forecasting/__init__.py | 16 ++-- hooli_data_eng/assets/forecasting/model.ipynb | 44 +++++---- hooli_data_eng/assets/marketing/__init__.py | 26 +++-- hooli_data_eng/assets/raw_data/__init__.py | 48 +++++----- hooli_data_eng/blueprints/blueprints.py | 4 +- hooli_data_eng/definitions.py | 74 +++++++++----- hooli_data_eng/jobs/__init__.py | 5 +- hooli_data_eng/resources/api.py | 41 ++++---- hooli_data_eng/resources/databricks.py | 56 ++++++----- hooli_data_eng/resources/dbt.py | 9 +- hooli_data_eng/resources/warehouse.py | 19 ++-- hooli_data_eng/schedules/__init__.py | 2 +- hooli_data_eng/sensors/__init__.py | 27 ++++-- hooli_data_eng/sensors/touch_s3_file.py | 16 ++-- hooli_data_eng/sensors/watch_s3.py | 4 +- hooli_data_eng/utils/__init__.py | 40 +++++--- hooli_data_eng/utils/dbt_code_version.py | 7 +- hooli_data_eng/utils/kind_helpers.py | 7 +- hooli_data_eng_tests/test_assets.py | 40 ++++---- hooli_snowflake_insights/definitions.py | 14 ++- pyproject.toml | 1 + 41 files changed, 489 insertions(+), 393 deletions(-) diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index e11b78ce..05e26ff7 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -32,6 +32,7 @@ jobs: if: steps.prerun.outputs.result != 'skip' with: ref: ${{ github.head_ref }} + - name: Get changed files id: changed-files uses: tj-actions/changed-files@v45 @@ -107,7 +108,12 @@ jobs: run: | uv venv source .venv/bin/activate - uv pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade; + uv pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb "dbt-snowflake<=1.8.4" --upgrade; + + - name: Run Ruff + uses: astral-sh/ruff-action@v3 + with: + args: check --fix --output-format=github . - name: Validate configuration id: ci-validate diff --git a/Makefile b/Makefile index 834744f7..88b9b900 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,9 @@ update_packages: uv lock --upgrade --directory hooli-data-ingest; uv lock --upgrade --directory hooli-bi; +ruff: + -ruff check --fix . + ruff format . # ensure that DAGSTER_GIT_REPO_DIR is set to the path of the dagster repo # see https://www.notion.so/dagster/Local-Dev-Setup-e58aba352f704dcc88a8dc44cb1ce7fc for more details diff --git a/hooli-bi/hooli_bi/__init__.py b/hooli-bi/hooli_bi/__init__.py index 6f9274b2..7be1ee3c 100644 --- a/hooli-bi/hooli_bi/__init__.py +++ b/hooli-bi/hooli_bi/__init__.py @@ -1 +1 @@ -from hooli_bi.definitions import defs as defs \ No newline at end of file +from hooli_bi.definitions import defs as defs diff --git a/hooli-bi/hooli_bi/powerbi_assets.py b/hooli-bi/hooli_bi/powerbi_assets.py index 5c102f0c..95a03d85 100644 --- a/hooli-bi/hooli_bi/powerbi_assets.py +++ b/hooli-bi/hooli_bi/powerbi_assets.py @@ -8,6 +8,7 @@ from dagster_powerbi.translator import PowerBIContentData from hooli_bi.powerbi_workspace import power_bi_workspace + class MyCustomPowerBITranslator(DagsterPowerBITranslator): def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: spec = super().get_report_spec(data) @@ -16,44 +17,52 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: description=f"Report link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/reports/{data.properties["id"]}", group_name="BI", ) - return merge_attributes(specs_replaced, tags={"core_kpis":""}) + return merge_attributes(specs_replaced, tags={"core_kpis": ""}) - def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: spec = super().get_semantic_model_spec(data) spec_replaced = replace_attributes( spec, description=f"Semantic model link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/datasets/{data.properties["id"]}/details", group_name="BI", - deps=[AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]]) for dep in spec.deps], + deps=[ + AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]]) + for dep in spec.deps + ], ) - return merge_attributes(spec_replaced, - metadata={"dagster/column_schema": TableSchema( - columns=[TableColumn( - name=col["name"], - type=col["dataType"], - tags={"PII":""} if col["name"] == "USER_ID" else None) - for col in data.properties["tables"][0]["columns"]])}, - tags={"core_kpis":""}, + return merge_attributes( + spec_replaced, + metadata={ + "dagster/column_schema": TableSchema( + columns=[ + TableColumn( + name=col["name"], + type=col["dataType"], + tags={"PII": ""} if col["name"] == "USER_ID" else None, + ) + for col in data.properties["tables"][0]["columns"] + ] + ) + }, + tags={"core_kpis": ""}, ) def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: spec = super().get_dashboard_spec(data) - return replace_attributes( - spec, - group_name="BI" - ) - + return replace_attributes(spec, group_name="BI") + def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: spec = super().get_data_source_spec(data) - return replace_attributes( - spec, - group_name="BI" - ) + return replace_attributes(spec, group_name="BI") + powerbi_assets = [ build_semantic_model_refresh_asset_definition(resource_key="power_bi", spec=spec) if spec.tags.get("dagster-powerbi/asset_type") == "semantic_model" else spec for spec in load_powerbi_asset_specs( - power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator, use_workspace_scan=True) -] \ No newline at end of file + power_bi_workspace, + dagster_powerbi_translator=MyCustomPowerBITranslator, + use_workspace_scan=True, + ) +] diff --git a/hooli-bi/hooli_bi/powerbi_workspace.py b/hooli-bi/hooli_bi/powerbi_workspace.py index afc35516..0f745336 100644 --- a/hooli-bi/hooli_bi/powerbi_workspace.py +++ b/hooli-bi/hooli_bi/powerbi_workspace.py @@ -9,4 +9,4 @@ tenant_id=EnvVar("AZURE_POWERBI_TENANT_ID"), ), workspace_id=EnvVar("AZURE_POWERBI_WORKSPACE_ID"), -) \ No newline at end of file +) diff --git a/hooli-data-ingest/hooli_data_ingest/__init__.py b/hooli-data-ingest/hooli_data_ingest/__init__.py index 4be0ed30..c5449e08 100644 --- a/hooli-data-ingest/hooli_data_ingest/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/__init__.py @@ -1 +1 @@ -from hooli_data_ingest.definitions import defs as defs \ No newline at end of file +from hooli_data_ingest.definitions import defs as defs diff --git a/hooli-data-ingest/hooli_data_ingest/assets/sling.py b/hooli-data-ingest/hooli_data_ingest/assets/sling.py index 752110ab..b6b247c4 100644 --- a/hooli-data-ingest/hooli_data_ingest/assets/sling.py +++ b/hooli-data-ingest/hooli_data_ingest/assets/sling.py @@ -1,6 +1,6 @@ from dagster_embedded_elt.sling import ( - sling_assets, - SlingResource, + sling_assets, + SlingResource, ) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator @@ -8,26 +8,27 @@ class CustomSlingTranslator(DagsterSlingTranslator): - def __init__(self, target_prefix="RAW_DATA"): + def __init__(self, target_prefix="RAW_DATA"): super().__init__(target_prefix=target_prefix) self.replication_config = replication_config - - def get_group_name(self, stream_definition): - return "RAW_DATA" - - def get_kinds(self, stream_definition): - storage_kind = self.replication_config.get("target", "DUCKDB") - if storage_kind.startswith("SNOWFLAKE"): + + def get_group_name(self, stream_definition): + return "RAW_DATA" + + def get_kinds(self, stream_definition): + storage_kind = self.replication_config.get("target", "DUCKDB") + if storage_kind.startswith("SNOWFLAKE"): storage_kind = "SNOWFLAKE" - return {"sling",storage_kind} - + return {"sling", storage_kind} @sling_assets( - replication_config=replication_config, - dagster_sling_translator=CustomSlingTranslator(), + replication_config=replication_config, + dagster_sling_translator=CustomSlingTranslator(), ) def my_sling_assets(context, sling: SlingResource): - yield from sling.replicate(context=context).fetch_column_metadata().fetch_row_count() - for row in sling.stream_raw_logs(): - context.log.info(row) + yield from ( + sling.replicate(context=context).fetch_column_metadata().fetch_row_count() + ) + for row in sling.stream_raw_logs(): + context.log.info(row) diff --git a/hooli-data-ingest/hooli_data_ingest/definitions.py b/hooli-data-ingest/hooli_data_ingest/definitions.py index def2e3e5..4f67fade 100644 --- a/hooli-data-ingest/hooli_data_ingest/definitions.py +++ b/hooli-data-ingest/hooli_data_ingest/definitions.py @@ -1,11 +1,10 @@ from pathlib import Path from dagster import ( - AnchorBasedFilePathMapping, - Definitions, - with_source_code_references, + AnchorBasedFilePathMapping, + Definitions, + with_source_code_references, ) -from dagster._core.definitions.metadata import with_source_code_references from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from hooli_data_ingest.assets.sling import my_sling_assets @@ -15,16 +14,14 @@ defs = Definitions( - assets=link_code_references_to_git_if_cloud( - with_source_code_references([my_sling_assets]), - file_path_mapping=AnchorBasedFilePathMapping( + assets=link_code_references_to_git_if_cloud( + with_source_code_references([my_sling_assets]), + file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), file_anchor_path_in_repository="hooli-data-ingest/hooli_data_ingest/definitions.py", ), - ), - schedules=[daily_sling_assets], - jobs=[daily_sling_job], - resources={ - "sling": sling_resource - }, + ), + schedules=[daily_sling_assets], + jobs=[daily_sling_job], + resources={"sling": sling_resource}, ) diff --git a/hooli-data-ingest/hooli_data_ingest/jobs/__init__.py b/hooli-data-ingest/hooli_data_ingest/jobs/__init__.py index c7f8e793..ee9e1893 100644 --- a/hooli-data-ingest/hooli_data_ingest/jobs/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/jobs/__init__.py @@ -5,6 +5,6 @@ daily_sling_job = define_asset_job( - name="daily_sling_job", - selection=raw_location_by_day, + name="daily_sling_job", + selection=raw_location_by_day, ) diff --git a/hooli-data-ingest/hooli_data_ingest/resources/__init__.py b/hooli-data-ingest/hooli_data_ingest/resources/__init__.py index d8f33b84..3915886b 100644 --- a/hooli-data-ingest/hooli_data_ingest/resources/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/resources/__init__.py @@ -3,16 +3,18 @@ from dagster import EnvVar from dagster_embedded_elt.sling import ( - SlingResource, - SlingConnectionResource, + SlingResource, + SlingConnectionResource, ) + def get_env(): - if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": - return "BRANCH" - if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": - return "PROD" - return "LOCAL" + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": + return "BRANCH" + if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": + return "PROD" + return "LOCAL" + # Paths for local dev current_file_path = Path(__file__) @@ -23,25 +25,19 @@ def get_env(): if get_env() == "LOCAL": - replication_config = { - "source": "LOCAL", - "target": "DUCKDB", - "defaults": { - "mode": "full-refresh", - "object": "{stream_file_folder}.{stream_file_name}", - "source_options": { - "format": "csv" - } - }, - "streams": { - LOCATIONS_CSV_PATH: { - "object": "locations" - } - } - } + replication_config = { + "source": "LOCAL", + "target": "DUCKDB", + "defaults": { + "mode": "full-refresh", + "object": "{stream_file_folder}.{stream_file_name}", + "source_options": {"format": "csv"}, + }, + "streams": {LOCATIONS_CSV_PATH: {"object": "locations"}}, + } - sling_resource = SlingResource( - connections=[ + sling_resource = SlingResource( + connections=[ SlingConnectionResource( name="Local", type="local", @@ -55,7 +51,7 @@ def get_env(): schema="raw_data", ), ] - ) + ) if get_env() != "LOCAL": replication_config = { @@ -64,36 +60,32 @@ def get_env(): "defaults": { "mode": "full-refresh", "object": "{stream_file_folder}.{stream_file_name}", - "source_options": { - "format": "csv" - } + "source_options": {"format": "csv"}, }, "streams": { - "s3://hooli-demo/embedded-elt/locations.csv": { - "object": "locations" - } - } + "s3://hooli-demo/embedded-elt/locations.csv": {"object": "locations"} + }, } sling_resource = SlingResource( connections=[ - SlingConnectionResource( - name="S3", - type="s3", - bucket=EnvVar("AWS_S3_BUCKET"), - region=EnvVar("AWS_REGION"), - access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), - secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), - ), - SlingConnectionResource( - name="SNOWFLAKE_PROD" if get_env() == "PROD" else "SNOWFLAKE_BRANCH", - type="snowflake", - host=EnvVar("SNOWFLAKE_HOST"), - user=EnvVar("SNOWFLAKE_USER"), - password=EnvVar("SNOWFLAKE_PASSWORD"), - role=EnvVar("SNOWFLAKE_ROLE"), - database="DEMO_DB2" if get_env() == "PROD" else "DEMO_DB2_BRANCH", - schema="RAW_DATA", - ) + SlingConnectionResource( + name="S3", + type="s3", + bucket=EnvVar("AWS_S3_BUCKET"), + region=EnvVar("AWS_REGION"), + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), + ), + SlingConnectionResource( + name="SNOWFLAKE_PROD" if get_env() == "PROD" else "SNOWFLAKE_BRANCH", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + role=EnvVar("SNOWFLAKE_ROLE"), + database="DEMO_DB2" if get_env() == "PROD" else "DEMO_DB2_BRANCH", + schema="RAW_DATA", + ), ] - ) \ No newline at end of file + ) diff --git a/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py b/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py index 8a797469..e2847607 100644 --- a/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py @@ -3,6 +3,6 @@ daily_sling_assets = ScheduleDefinition( - job=daily_sling_job, - cron_schedule="0 0 * * *", # every day at midnight + job=daily_sling_job, + cron_schedule="0 0 * * *", # every day at midnight ) diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index baf08303..7f9329ba 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -12,45 +12,54 @@ from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression + @asset( kinds={"Kubernetes", "S3"}, ) def country_stats() -> DataFrame: - df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0] - df.columns = ["country", "Population (1 July 2022)", "Population (1 July 2023)", "Change", "UN Continental Region[1]", "UN Statistical Subregion[1]"] + df = read_html("https://tinyurl.com/mry64ebh", flavor="html5lib")[0] + df.columns = [ + "country", + "Population (1 July 2022)", + "Population (1 July 2023)", + "Change", + "UN Continental Region[1]", + "UN Statistical Subregion[1]", + ] df = df.drop(columns=["Change"]) - df = df.rename(columns={ - "UN Continental Region[1]": "continent", - "UN Statistical Subregion[1]": "region", - "Population (1 July 2022)": "pop_2022", - "Population (1 July 2023)": "pop_2023", + df = df.rename( + columns={ + "UN Continental Region[1]": "continent", + "UN Statistical Subregion[1]": "region", + "Population (1 July 2022)": "pop_2022", + "Population (1 July 2023)": "pop_2023", } ) - df["pop_change"] = ((to_numeric(df["pop_2023"]) / to_numeric(df["pop_2022"])) - 1)*100 + df["pop_change"] = ( + (to_numeric(df["pop_2023"]) / to_numeric(df["pop_2022"])) - 1 + ) * 100 return df -@asset_check( - asset=country_stats -) + +@asset_check(asset=country_stats) def check_country_stats(country_stats): return AssetCheckResult(passed=True) -@asset( - kinds={"Kubernetes", "S3"} -) + +@asset(kinds={"Kubernetes", "S3"}) def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) dummies = get_dummies(data[["continent"]]) return Regression().fit(dummies, data["pop_change"]) -@asset( - kinds={"Kubernetes", "S3"} -) + +@asset(kinds={"Kubernetes", "S3"}) def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame: result = country_stats.groupby("continent").sum() result["pop_change_factor"] = change_model.coef_ return result + defs = Definitions( assets=link_code_references_to_git_if_cloud( with_source_code_references([country_stats, continent_stats, change_model]), @@ -58,7 +67,6 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF local_file_anchor=Path(__file__), file_anchor_path_in_repository="hooli_basics/definitions.py", ), - ), - asset_checks=[check_country_stats] + ), + asset_checks=[check_country_stats], ) - diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/__init__.py b/hooli_batch_enrichment/dagster_batch_enrichment/__init__.py index 833d04c2..1123b528 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/__init__.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/__init__.py @@ -1 +1 @@ -from dagster_batch_enrichment.definitions import defs as defs \ No newline at end of file +from dagster_batch_enrichment.definitions import defs as defs diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/api.py b/hooli_batch_enrichment/dagster_batch_enrichment/api.py index d76c745a..426ab6a3 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/api.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/api.py @@ -1,6 +1,5 @@ import random -import numpy as np import pandas as pd import requests import responses @@ -12,10 +11,10 @@ class EnrichmentAPI(ConfigurableResource): @responses.activate def get_order_details(_, order_id): - x = random.randint(0,500) + x = random.randint(0, 500) if x <= 1: raise Exception("API time out") - + responses.get( # fake endpoint "http://api.jaffleshop.co/v1/order_details", diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py index 60b94870..198bd7a5 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py @@ -1,6 +1,16 @@ import json -from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy, Config +from dagster import ( + asset, + OpExecutionContext, + MetadataValue, + DynamicOut, + op, + DynamicOutput, + graph_asset, + RetryPolicy, + Config, +) import pandas as pd from pydantic import Field import numpy as np @@ -11,30 +21,33 @@ class experimentConfig(Config): experiment_name: str = Field( - default="default_config", - description="A name to give to this run's configuration set" + default="default_config", + description="A name to give to this run's configuration set", ) + @asset( kinds={"Kubernetes", "S3"}, ) def raw_data( - context: OpExecutionContext, + context: OpExecutionContext, warehouse: MyWarehouse, config: experimentConfig, ): - """ Placeholder for querying a real data source""" + """Placeholder for querying a real data source""" orders_to_process = warehouse.get_raw_data() - + # add any logging context.log.info(f"Received {len(orders_to_process)} orders to process") - + # associate metadata with the raw data asset materialization - context.add_output_metadata(metadata={ - "preview": MetadataValue.md(orders_to_process.head(3).to_markdown()), - "nrows": len(orders_to_process), - "user_input": config.experiment_name - }) + context.add_output_metadata( + metadata={ + "preview": MetadataValue.md(orders_to_process.head(3).to_markdown()), + "nrows": len(orders_to_process), + "user_input": config.experiment_name, + } + ) return orders_to_process @@ -45,7 +58,10 @@ def raw_data( # The batch size is configurable with a default of 50 records per batch # The batches are processed in parallel threads class ParallelizationConfig(Config): - number_records_per_batch: int = Field(50, description="Number of records to use per batch") + number_records_per_batch: int = Field( + 50, description="Number of records to use per batch" + ) + @op(out=DynamicOut()) def split_rows(context: OpExecutionContext, raw_data, config: ParallelizationConfig): @@ -60,10 +76,10 @@ def split_rows(context: OpExecutionContext, raw_data, config: ParallelizationCon yield DynamicOutput(c, mapping_key=str(r)) -@op( - retry_policy=RetryPolicy(max_retries=2) -) -def process_chunk(context: OpExecutionContext, chunk, api: EnrichmentAPI) -> pd.DataFrame: +@op(retry_policy=RetryPolicy(max_retries=2)) +def process_chunk( + context: OpExecutionContext, chunk, api: EnrichmentAPI +) -> pd.DataFrame: """ Process rows in each chunk by calling the enrichment API within a chunk processing is sequential @@ -97,6 +113,3 @@ def enriched_data(raw_data) -> pd.DataFrame: chunks_mapped = chunks.map(process_chunk) enriched_chunks = chunks_mapped.collect() return concat_chunk_list(enriched_chunks) - - - diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index 25a51401..a565b084 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -18,14 +18,12 @@ # alternatively could use freshness policies and auto-materialization, partitions, or other ways to orient the schedule run_assets_job = define_asset_job( name="run_etl_pipeline", - selection=AssetSelection.all(), - tags={"dagster/max_retries": "1"} + selection=AssetSelection.all(), + tags={"dagster/max_retries": "1"}, ) run_assets_30min = ScheduleDefinition( - name="run_assets_30min", - job=run_assets_job, - cron_schedule="*/30 * * * *" + name="run_assets_30min", job=run_assets_job, cron_schedule="*/30 * * * *" ) defs = Definitions( @@ -41,6 +39,6 @@ resources={ "api": EnrichmentAPI(), # place holder for a real warehouse with required connection config - "warehouse": MyWarehouse(path="raw_data.csv") - } -) \ No newline at end of file + "warehouse": MyWarehouse(path="raw_data.csv"), + }, +) diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/warehouse.py b/hooli_batch_enrichment/dagster_batch_enrichment/warehouse.py index 6356db70..509480f7 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/warehouse.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/warehouse.py @@ -1,9 +1,9 @@ -from typing import Callable from dagster import ConfigurableResource import duckdb + class MyWarehouse(ConfigurableResource): - path: str + path: str def get_raw_data(self): - return duckdb.sql(f"SELECT * FROM \'{self.path}\'").df() \ No newline at end of file + return duckdb.sql(f"SELECT * FROM '{self.path}'").df() diff --git a/hooli_data_eng/__init__.py b/hooli_data_eng/__init__.py index fc3cc4e1..3c25b881 100644 --- a/hooli_data_eng/__init__.py +++ b/hooli_data_eng/__init__.py @@ -1 +1 @@ -from .definitions import defs +from .definitions import defs as defs diff --git a/hooli_data_eng/assets/__init__.py b/hooli_data_eng/assets/__init__.py index 0519ecba..e69de29b 100644 --- a/hooli_data_eng/assets/__init__.py +++ b/hooli_data_eng/assets/__init__.py @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index 17c4f5e2..2d2e21b6 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -101,12 +101,17 @@ def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, An return {**default_metadata, **metadata} def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]): - if dbt_resource_props["name"] in ["company_stats", "locations_cleaned", "weekly_order_summary", "order_stats"]: - return allow_outdated_and_missing_parents_condition + if dbt_resource_props["name"] in [ + "company_stats", + "locations_cleaned", + "weekly_order_summary", + "order_stats", + ]: + return allow_outdated_and_missing_parents_condition + + if dbt_resource_props["name"] in ["sku_stats"]: + return AutomationCondition.on_cron("0 0 1 * *") - if dbt_resource_props["name"] in ["sku_stats"]: - return AutomationCondition.on_cron('0 0 1 * *') - if dbt_resource_props["name"] in ["company_perf"]: return AutomationCondition.any_downstream_conditions() @@ -117,7 +122,9 @@ def get_owners(self, dbt_resource_props: Mapping[str, Any]): ] -def _process_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): +def _process_partitioned_dbt_assets( + context: AssetExecutionContext, dbt: DbtCliResource +): # map partition key range to dbt vars first_partition, last_partition = context.partition_time_window dbt_vars = {"min_date": str(first_partition), "max_date": str(last_partition)} diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index 396705ff..77d15f82 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -9,7 +9,6 @@ AssetIn, MonthlyPartitionsDefinition, Output, - Field, Config, AssetExecutionContext, MaterializeResult, @@ -20,10 +19,9 @@ from dagster._utils import file_relative_path from dagster_databricks import PipesDatabricksClient from databricks.sdk.service import jobs -from pydantic import Field from hooli_data_eng.utils.kind_helpers import get_kind - +from pydantic import Field # dynamically determine storage_kind based on environment storage_kind = get_kind() @@ -63,7 +61,7 @@ class modelHyperParams(Config): @asset( ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])}, io_manager_key="model_io_manager", - kinds={"scikitlearn", "S3"} + kinds={"scikitlearn", "S3"}, ) def order_forecast_model( context, weekly_order_summary: pd.DataFrame, config: modelHyperParams @@ -98,9 +96,9 @@ def order_forecast_model( io_manager_key="model_io_manager", partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"), tags={ - "core_kpis":"", - }, - kinds={"scikitlearn", storage_kind} + "core_kpis": "", + }, + kinds={"scikitlearn", storage_kind}, ) def model_stats_by_month( context, @@ -138,7 +136,7 @@ def model_stats_by_month( "order_forecast_model": AssetIn(), }, key_prefix=["FORECASTING"], - kinds={"pandas", storage_kind} + kinds={"pandas", storage_kind}, ) def predicted_orders( weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float] @@ -279,4 +277,4 @@ def k8s_pod_asset( namespace="data-eng-prod", base_pod_spec=pod_spec, extras=extras, - ).get_materialize_result() \ No newline at end of file + ).get_materialize_result() diff --git a/hooli_data_eng/assets/forecasting/model.ipynb b/hooli_data_eng/assets/forecasting/model.ipynb index 99c0a5b0..b2667e00 100644 --- a/hooli_data_eng/assets/forecasting/model.ipynb +++ b/hooli_data_eng/assets/forecasting/model.ipynb @@ -6,11 +6,11 @@ "metadata": {}, "outputs": [], "source": [ - "import dagstermill\n", "import numpy as np\n", "import pandas as pd\n", "from hooli_data_eng.utils import _random_times\n", "from hooli_data_eng.assets.forecasting import model_func\n", + "\n", "# import plotly.express as px\n", "from plotnine import ggplot, geom_point, stat_smooth, aes" ] @@ -26,12 +26,11 @@ "outputs": [], "source": [ "# for local testing we use sample data, this will be replaced at runtime by the dagster asset\n", - "weekly_order_summary = pd.DataFrame({\n", - " \"order_date\": _random_times(100),\n", - " \"num_orders\": np.random.normal(10, 5, 100)\n", - "})\n", + "weekly_order_summary = pd.DataFrame(\n", + " {\"order_date\": _random_times(100), \"num_orders\": np.random.normal(10, 5, 100)}\n", + ")\n", "# this will be replaced by the trained model parameters\n", - "order_forecast_model = tuple([5,5])" + "order_forecast_model = tuple([5, 5])" ] }, { @@ -63,25 +62,32 @@ "# generate predictions for the true data\n", "a, b = order_forecast_model\n", "\n", - "daily_order_preds = pd.DataFrame({\n", - " 'order_date': weekly_order_summary['order_date'],\n", - " 'num_orders': model_func(weekly_order_summary['order_date'].astype(np.int64), a, b)\n", - "})\n", - "weekly_order_summary['type'] = 'observed'\n", - "daily_order_preds['type'] = 'predicted'\n", + "daily_order_preds = pd.DataFrame(\n", + " {\n", + " \"order_date\": weekly_order_summary[\"order_date\"],\n", + " \"num_orders\": model_func(\n", + " weekly_order_summary[\"order_date\"].astype(np.int64), a, b\n", + " ),\n", + " }\n", + ")\n", + "weekly_order_summary[\"type\"] = \"observed\"\n", + "daily_order_preds[\"type\"] = \"predicted\"\n", "\n", "print(len(weekly_order_summary))\n", "\n", "combined = pd.concat([weekly_order_summary, daily_order_preds])\n", - "(ggplot(combined, aes('order_date', 'num_orders', color = 'type')) + geom_point(alpha = 0.5) + stat_smooth(method='lm'))\n", + "(\n", + " ggplot(combined, aes(\"order_date\", \"num_orders\", color=\"type\"))\n", + " + geom_point(alpha=0.5)\n", + " + stat_smooth(method=\"lm\")\n", + ")\n", "\n", "# - Interactive plots (not currently supported in dagit notebook rendering)\n", - "#weekly_order_summary = weekly_order_summary.sort_values(by=\"order_date\")\n", - "#weekly_order_summary['num_orders_predicted'] = model_func(weekly_order_summary['order_date'].astype(np.int64), a, b)\n", - "#weekly_order_summary = weekly_order_summary.sort_values(by=\"order_date\")\n", - "#fig = px.line(weekly_order_summary, x=\"order_date\", y=['num_orders', 'num_orders_predicted'], markers=True)\n", - "#fig.show()\n", - "\n" + "# weekly_order_summary = weekly_order_summary.sort_values(by=\"order_date\")\n", + "# weekly_order_summary['num_orders_predicted'] = model_func(weekly_order_summary['order_date'].astype(np.int64), a, b)\n", + "# weekly_order_summary = weekly_order_summary.sort_values(by=\"order_date\")\n", + "# fig = px.line(weekly_order_summary, x=\"order_date\", y=['num_orders', 'num_orders_predicted'], markers=True)\n", + "# fig.show()" ] }, { diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index 787ff5a1..9d72b402 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -12,11 +12,10 @@ AssetCheckResult, asset_check, AssetKey, - define_asset_job, + define_asset_job, ScheduleDefinition, AssetSelection, - EnvVar,) -from dagster_snowflake import SnowflakeResource +) from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks import pandas as pd from hooli_data_eng.utils.kind_helpers import get_kind @@ -30,7 +29,7 @@ # dbt and create summaries using pandas @asset( key_prefix="MARKETING", - automation_condition=AutomationCondition.on_cron('0 0 1-31/2 * *'), + automation_condition=AutomationCondition.on_cron("0 0 1-31/2 * *"), owners=["team:programmers", "lopp@dagsterlabs.com"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, kinds={"pandas", storage_kind}, @@ -90,23 +89,22 @@ def key_product_deepdive(context, sku_stats): min_order_freshness_check = build_last_update_freshness_checks( - assets=[min_order, - AssetKey(["RAW_DATA", "orders"]), - AssetKey(["RAW_DATA", "users"]) - ], + assets=[ + min_order, + AssetKey(["RAW_DATA", "orders"]), + AssetKey(["RAW_DATA", "users"]), + ], lower_bound_delta=datetime.timedelta( hours=24 ), # expect new data at least once a day ) avg_orders_freshness_check = build_anomaly_detection_freshness_checks( - assets=[avg_orders], - params=None + assets=[avg_orders], params=None ) min_order_freshness_check_sensor = build_sensor_for_freshness_checks( - freshness_checks=min_order_freshness_check, - minimum_interval_seconds=10*60 + freshness_checks=min_order_freshness_check, minimum_interval_seconds=10 * 60 ) avg_orders_freshness_check_schedule = ScheduleDefinition( @@ -114,6 +112,6 @@ def key_product_deepdive(context, sku_stats): cron_schedule="*/10 * * * *", job=define_asset_job( "check_avg_orders_freshness_job", - selection=AssetSelection.checks(*avg_orders_freshness_check) - ) + selection=AssetSelection.checks(*avg_orders_freshness_check), + ), ) diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index 7ea09c31..e9cb0668 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -7,7 +7,6 @@ AssetCheckResult, AssetKey, BackfillPolicy, - Backoff, build_column_schema_change_checks, Backoff, DailyPartitionsDefinition, @@ -25,16 +24,14 @@ storage_kind = get_kind() -daily_partitions = DailyPartitionsDefinition( - start_date="2023-05-25" -) +daily_partitions = DailyPartitionsDefinition(start_date="2023-05-25") def _daily_partition_seq(start, end): start = pd.to_datetime(start) end = pd.to_datetime(end) daily_diffs = int((end - start) / timedelta(hours=24)) - + return [str(start + timedelta(hours=i)) for i in range(daily_diffs)] @@ -56,36 +53,36 @@ def users(context, api: RawDataAPI) -> pd.DataFrame: users = pd.read_json(resp.json()) all_users.append(users) - return pd.concat(all_users) + @asset_check( - asset=AssetKey(["RAW_DATA", "users"]), - description="check that users are from expected companies", + asset=AssetKey(["RAW_DATA", "users"]), + description="check that users are from expected companies", ) def check_users(context, users: pd.DataFrame): - observed_companies = set(pd.unique(users['company'])) + observed_companies = set(pd.unique(users["company"])) expected_companies = {"ShopMart", "SportTime", "FamilyLtd", "DiscountStore"} return AssetCheckResult( - passed= (set(observed_companies) == expected_companies), - metadata={"result": MetadataValue.md( - f""" + passed=(set(observed_companies) == expected_companies), + metadata={ + "result": MetadataValue.md( + f""" Observed the following unexpected companies: {list(observed_companies - expected_companies)} """ - )}, - severity=AssetCheckSeverity.WARN + ) + }, + severity=AssetCheckSeverity.WARN, ) + @asset( partitions_def=daily_partitions, metadata={"partition_expr": "DT"}, retry_policy=RetryPolicy( - max_retries=3, - delay=1, - backoff=Backoff.LINEAR, - jitter=Jitter.FULL + max_retries=3, delay=1, backoff=Backoff.LINEAR, jitter=Jitter.FULL ), backfill_policy=BackfillPolicy.single_run(), kinds={"api", storage_kind}, @@ -99,12 +96,15 @@ def orders(context, api: RawDataAPI) -> pd.DataFrame: resp = api.get_orders(partition) users = pd.read_json(resp.json()) all_orders.append(users) - + all_orders_df = pd.concat(all_orders) - all_orders_df['dt'] = pd.to_datetime(all_orders_df['dt'], unit = "ms") + all_orders_df["dt"] = pd.to_datetime(all_orders_df["dt"], unit="ms") return all_orders_df -raw_data_schema_checks = build_column_schema_change_checks(assets=[ - AssetKey(["RAW_DATA", "orders"]), - AssetKey(["RAW_DATA", "users"]), -]) + +raw_data_schema_checks = build_column_schema_change_checks( + assets=[ + AssetKey(["RAW_DATA", "orders"]), + AssetKey(["RAW_DATA", "users"]), + ] +) diff --git a/hooli_data_eng/blueprints/blueprints.py b/hooli_data_eng/blueprints/blueprints.py index c53b9e5c..fd4952c1 100644 --- a/hooli_data_eng/blueprints/blueprints.py +++ b/hooli_data_eng/blueprints/blueprints.py @@ -10,6 +10,7 @@ # This class is used to construct dbt jobs via yaml files + class DbtSelectScheduledJobBlueprint(Blueprint): type: Literal["dbt_select_job"] name: str @@ -27,8 +28,9 @@ def build_defs(self) -> Definitions: schedule_def = ScheduleDefinition(job=job_def, cron_schedule=self.cron) return Definitions(schedules=[schedule_def]) + # The loader will pick up any yaml files in the blueprints_jobs directory loader = YamlBlueprintsLoader( per_file_blueprint_type=DbtSelectScheduledJobBlueprint, path=Path(__file__).parent / "blueprints_jobs", -) \ No newline at end of file +) diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 81cea58b..1e5a7f35 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -20,8 +20,16 @@ from hooli_data_eng.schedules import analytics_schedule from hooli_data_eng.sensors import orders_sensor, dbt_code_version_sensor from hooli_data_eng.sensors.watch_s3 import watch_s3_sensor -from hooli_data_eng.assets.marketing import avg_orders_freshness_check, min_order_freshness_check, min_order_freshness_check_sensor, check_avg_orders, avg_orders_freshness_check_schedule -from hooli_data_eng.assets.dbt_assets import weekly_freshness_check, weekly_freshness_check_sensor +from hooli_data_eng.assets.marketing import ( + avg_orders_freshness_check, + min_order_freshness_check, + min_order_freshness_check_sensor, + avg_orders_freshness_check_schedule, +) +from hooli_data_eng.assets.dbt_assets import ( + weekly_freshness_check, + weekly_freshness_check_sensor, +) # --------------------------------------------------- # Assets @@ -66,29 +74,43 @@ # used with a project. Dagster Cloud deployments can contain mulitple projects. # Use Definitions.merge to include blueprints definitions -defs = Definitions.merge(loader.load_defs(), Definitions( - executor=multiprocess_executor.configured( - {"max_concurrent": 3} - ), - assets=link_code_references_to_git_if_cloud( - with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets,]), - file_path_mapping=AnchorBasedFilePathMapping( - local_file_anchor=Path(__file__), - file_anchor_path_in_repository="hooli_data_eng/definitions.py" - ) +defs = Definitions.merge( + loader.load_defs(), + Definitions( + executor=multiprocess_executor.configured({"max_concurrent": 3}), + assets=link_code_references_to_git_if_cloud( + with_source_code_references( + [ + *dbt_assets, + *raw_data_assets, + *forecasting_assets, + *marketing_assets, + ] + ), + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli_data_eng/definitions.py", + ), + ), + asset_checks=[ + *raw_data_schema_checks, + *dbt_asset_checks, + check_users, + check_avg_orders, + *min_order_freshness_check, + *avg_orders_freshness_check, + *weekly_freshness_check, + ], + resources=resource_def[get_env()], + schedules=[analytics_schedule, avg_orders_freshness_check_schedule], + sensors=[ + orders_sensor, + watch_s3_sensor, + # asset_delay_alert_sensor, + min_order_freshness_check_sensor, + dbt_code_version_sensor, + weekly_freshness_check_sensor, + ], + jobs=[analytics_job, predict_job, dbt_slim_ci_job], ), - asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], - resources=resource_def[get_env()], - schedules=[analytics_schedule, avg_orders_freshness_check_schedule], - sensors=[ - orders_sensor, - watch_s3_sensor, -# asset_delay_alert_sensor, - min_order_freshness_check_sensor, - dbt_code_version_sensor, - weekly_freshness_check_sensor - ], - jobs=[analytics_job, predict_job, dbt_slim_ci_job], - ) ) - diff --git a/hooli_data_eng/jobs/__init__.py b/hooli_data_eng/jobs/__init__.py index 6c1d7c79..e88ebf94 100644 --- a/hooli_data_eng/jobs/__init__.py +++ b/hooli_data_eng/jobs/__init__.py @@ -11,7 +11,8 @@ # which is an asset representing a model in dbt analytics_job = define_asset_job( name="refresh_analytics_model_job", - selection=AssetSelection.keys(["ANALYTICS", "orders_augmented"]).upstream() - AssetSelection.keys(["CLEANED", "locations_cleaned"]), + selection=AssetSelection.keys(["ANALYTICS", "orders_augmented"]).upstream() + - AssetSelection.keys(["CLEANED", "locations_cleaned"]), tags={"dagster/max_retries": "1"}, # config = {"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}} ) @@ -22,4 +23,4 @@ "predict_job", selection=AssetSelection.keys(["FORECASTING", "predicted_orders"]), tags={"alert_team": "ml"}, -) \ No newline at end of file +) diff --git a/hooli_data_eng/resources/api.py b/hooli_data_eng/resources/api.py index 881383c8..b26dbb3f 100644 --- a/hooli_data_eng/resources/api.py +++ b/hooli_data_eng/resources/api.py @@ -7,36 +7,40 @@ # This demo uses the responses package to mock an API # instead of relying on a real API -import responses +import responses import requests import pandas as pd from hooli_data_eng.utils import random_data import numpy as np -import random +import random from dagster import ConfigurableResource from typing import Optional + class RawDataAPI(ConfigurableResource): - - flaky: Optional[bool] = True + flaky: Optional[bool] = True @responses.activate def get_orders(self, datetime_to_process): # add lots of flakiness print(f"Flakiness set to: {self.flaky} with type: {type(self.flaky)}") - if self.flaky and random.randint(0,10) <= 4: + if self.flaky and random.randint(0, 10) <= 4: raise Exception("API time out") responses.get( # fake endpoint "http://api.jaffleshop.co/v1/orders", - # random order data returned, see utils.py - json = random_data( - extra_columns={"order_id": str, "quantity": int, "purchase_price": float, "sku": str}, - n = 10, - filter_date=datetime_to_process - ).to_json() + json=random_data( + extra_columns={ + "order_id": str, + "quantity": int, + "purchase_price": float, + "sku": str, + }, + n=10, + filter_date=datetime_to_process, + ).to_json(), ) return requests.get("http://api.jaffleshop.co/v1/orders") @@ -44,24 +48,25 @@ def get_orders(self, datetime_to_process): @responses.activate def get_users(self, datetime_to_process): # add some of flakiness - if self.flaky and random.randint(0,10) <= 2: + if self.flaky and random.randint(0, 10) <= 2: raise Exception("API time out") responses.get( # fake endpoint "http://api.jaffleshop.co/v1/users", - # random user data returned - json = pd.DataFrame( + json=pd.DataFrame( { "user_id": range(10), "company": np.random.choice( ["FoodCo", "ShopMart", "SportTime", "FamilyLtd"], size=10 ), - "is_test_user": np.random.choice([True, False], p=[0.002, 0.998], size=10), - "created_at": pd.to_datetime(datetime_to_process) + "is_test_user": np.random.choice( + [True, False], p=[0.002, 0.998], size=10 + ), + "created_at": pd.to_datetime(datetime_to_process), } - ).to_json() + ).to_json(), ) - return requests.get("http://api.jaffleshop.co/v1/users") \ No newline at end of file + return requests.get("http://api.jaffleshop.co/v1/users") diff --git a/hooli_data_eng/resources/databricks.py b/hooli_data_eng/resources/databricks.py index 66115ef0..eecc9fb1 100644 --- a/hooli_data_eng/resources/databricks.py +++ b/hooli_data_eng/resources/databricks.py @@ -1,13 +1,13 @@ # --------------------------------------------------- -# This resource allows the big_orders asset to -# run on a databricks cluster in branch or prod +# This resource allows the big_orders asset to +# run on a databricks cluster in branch or prod # (Locally the asset uses a local pyspark session) -# During asset evaluation, a new cluster is created -# and the project code is sent to the cluster using a +# During asset evaluation, a new cluster is created +# and the project code is sent to the cluster using a # *step launcher* # Existing clusters can be used instead of creating a -# job-specific cluster by changing the cluster key to +# job-specific cluster by changing the cluster key to # "cluster": {"existing": "cluster-id"} from dagster_databricks import databricks_pyspark_step_launcher @@ -15,28 +15,22 @@ import os cluster_config = { - "size": { - "num_workers": 1 - }, + "size": {"num_workers": 1}, "spark_version": "11.2.x-scala2.12", - "nodes": { - "node_types": { - "node_type_id": "i3.xlarge" - } - } + "nodes": {"node_types": {"node_type_id": "i3.xlarge"}}, } -db_step_launcher = databricks_pyspark_step_launcher.configured({ - +db_step_launcher = databricks_pyspark_step_launcher.configured( + { "run_config": { "run_name": "launch_step", "cluster": {"new": cluster_config}, - "libraries": [ + "libraries": [ {"pypi": {"package": "dagster-aws"}}, {"pypi": {"package": "dagster-pandas"}}, {"pypi": {"package": "dagster-dbt"}}, {"pypi": {"package": "dagster-duckdb"}}, - {"pypi": {"package": "dagster-duckdb-pandas"}}, + {"pypi": {"package": "dagster-duckdb-pandas"}}, {"pypi": {"package": "dbt-core"}}, {"pypi": {"package": "dbt-duckdb"}}, {"pypi": {"package": "dbt-snowflake"}}, @@ -58,15 +52,31 @@ "databricks_token": {"env": "DATABRICKS_TOKEN"}, "local_pipeline_package_path": str(Path(__file__).parent.parent.parent), "env_variables": { - "DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT": os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", ""), - "DAGSTER_CLOUD_DEPLOYMENT_NAME": os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") + "DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT": os.getenv( + "DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "" + ), + "DAGSTER_CLOUD_DEPLOYMENT_NAME": os.getenv( + "DAGSTER_CLOUD_DEPLOYMENT_NAME", "" + ), }, "secrets_to_env_variables": [ {"name": "DATABRICKS_HOST", "key": "adls2_key", "scope": "dagster-test"}, {"name": "DATABRICKS_TOKEN", "key": "adls2_key", "scope": "dagster-test"}, - {"name": "SNOWFLAKE_USER", "key": "snowflake_user", "scope": "dagster-test"}, - {"name": "SNOWFLAKE_PASSWORD", "key": "snowflake_password", "scope": "dagster-test"}, - {"name": "SNOWFLAKE_ACCOUNT", "key": "snowflake_account", "scope": "dagster-test"}, + { + "name": "SNOWFLAKE_USER", + "key": "snowflake_user", + "scope": "dagster-test", + }, + { + "name": "SNOWFLAKE_PASSWORD", + "key": "snowflake_password", + "scope": "dagster-test", + }, + { + "name": "SNOWFLAKE_ACCOUNT", + "key": "snowflake_account", + "scope": "dagster-test", + }, ], "storage": { "s3": { @@ -76,4 +86,4 @@ } }, } -) \ No newline at end of file +) diff --git a/hooli_data_eng/resources/dbt.py b/hooli_data_eng/resources/dbt.py index 2c9cc51b..d40584ed 100644 --- a/hooli_data_eng/resources/dbt.py +++ b/hooli_data_eng/resources/dbt.py @@ -4,14 +4,11 @@ # NO LONGER USED IN PROJECT, BUT EXAMPLE OF CUSTOMIZING AN INTEGRATION RESOURCE + class DbtCli2(DbtCliResource): profiles_dir: str - def cli(self, args: List[str], - *, - context: Optional[OpExecutionContext] = None): - + def cli(self, args: List[str], *, context: Optional[OpExecutionContext] = None): args = [*args, "--profiles-dir", self.profiles_dir] - + return super().cli(args=args, context=context) - diff --git a/hooli_data_eng/resources/warehouse.py b/hooli_data_eng/resources/warehouse.py index dd90d676..c155ce1a 100644 --- a/hooli_data_eng/resources/warehouse.py +++ b/hooli_data_eng/resources/warehouse.py @@ -1,38 +1,33 @@ -from dagster_snowflake_pandas import SnowflakePandasIOManager - from dagster_snowflake import SnowflakeIOManager from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient from dagster_snowflake_pandas import SnowflakePandasTypeHandler -from dagster._core.storage.db_io_manager import ( - DbTypeHandler, - DbIOManager -) +from dagster._core.storage.db_io_manager import DbTypeHandler, DbIOManager from dagster._core.execution.context.output import OutputContext from typing import Sequence # NO LONGER USED IN PROJECT, BUT EXAMPLE OF CUSTOMIZING AN INTEGRATION RESOURCE -class MyDBIOManager(DbIOManager): +class MyDBIOManager(DbIOManager): def _get_table_slice(self, context, output_context: OutputContext): metadata = {"partition_expr": "order_date"} - if 'orders_cleaned' in output_context.asset_key.path: + if "orders_cleaned" in output_context.asset_key.path: metadata = {"partition_expr": "dt"} - - if 'users_cleaned' in output_context.asset_key.path: + + if "users_cleaned" in output_context.asset_key.path: metadata = {"partition_expr": "created_at"} output_context._metadata = metadata return super()._get_table_slice(context=context, output_context=output_context) + class MySnowflakeIOManager(SnowflakeIOManager): @staticmethod def type_handlers() -> Sequence[DbTypeHandler]: return [SnowflakePandasTypeHandler()] - def create_io_manager(self, context) -> MyDBIOManager: return MyDBIOManager( db_client=SnowflakeDbClient(), @@ -41,4 +36,4 @@ def create_io_manager(self, context) -> MyDBIOManager: schema=self.schema_, type_handlers=self.type_handlers(), default_load_type=self.default_load_type(), - ) \ No newline at end of file + ) diff --git a/hooli_data_eng/schedules/__init__.py b/hooli_data_eng/schedules/__init__.py index 1d19d0b0..bc12ef51 100644 --- a/hooli_data_eng/schedules/__init__.py +++ b/hooli_data_eng/schedules/__init__.py @@ -3,4 +3,4 @@ from hooli_data_eng.jobs import analytics_job # This schedule tells dagster to run the analytics_job daily -analytics_schedule = build_schedule_from_partitioned_job(analytics_job) \ No newline at end of file +analytics_schedule = build_schedule_from_partitioned_job(analytics_job) diff --git a/hooli_data_eng/sensors/__init__.py b/hooli_data_eng/sensors/__init__.py index 287b2f71..5839fc2e 100644 --- a/hooli_data_eng/sensors/__init__.py +++ b/hooli_data_eng/sensors/__init__.py @@ -14,6 +14,7 @@ from hooli_data_eng.assets.dbt_assets import regular_dbt_assets from hooli_data_eng.utils.dbt_code_version import get_current_dbt_code_version + # This sensor listens for changes to the orders_augmented asset which # represents a dbt model. When the table managed by dbt is updated, # this sensor will trigger the predict_job above, ensuring that anytime @@ -23,24 +24,32 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): yield RunRequest(run_key=context.cursor) - @sensor(asset_selection=AssetSelection.assets(regular_dbt_assets)) def dbt_code_version_sensor(context: SensorEvaluationContext): - context.log.info(f"Checking code versions for assets: {regular_dbt_assets.keys}") assets_to_materialize = [] for asset_key in regular_dbt_assets.keys: - latest_materialization = context.instance.get_latest_materialization_event(asset_key) + latest_materialization = context.instance.get_latest_materialization_event( + asset_key + ) if latest_materialization: - latest_code_version = latest_materialization.asset_materialization.tags.get("dagster/code_version") - context.log.info(f"Latest code version for {asset_key}: {latest_code_version}") - current_code_version = get_current_dbt_code_version(asset_key) # Implement this function to get the current code version - context.log.info(f"Current code version for {asset_key}: {current_code_version}") + latest_code_version = latest_materialization.asset_materialization.tags.get( + "dagster/code_version" + ) + context.log.info( + f"Latest code version for {asset_key}: {latest_code_version}" + ) + current_code_version = get_current_dbt_code_version( + asset_key + ) # Implement this function to get the current code version + context.log.info( + f"Current code version for {asset_key}: {current_code_version}" + ) if latest_code_version != current_code_version: assets_to_materialize.append(asset_key) context.log.info(f"Assets to materialize: {assets_to_materialize}") if assets_to_materialize: yield RunRequest( run_key=f"code_version_update_{datetime.now()}", - asset_selection=list(assets_to_materialize) - ) \ No newline at end of file + asset_selection=list(assets_to_materialize), + ) diff --git a/hooli_data_eng/sensors/touch_s3_file.py b/hooli_data_eng/sensors/touch_s3_file.py index c1ccc1eb..59782037 100644 --- a/hooli_data_eng/sensors/touch_s3_file.py +++ b/hooli_data_eng/sensors/touch_s3_file.py @@ -4,15 +4,15 @@ # Internal Only # Run this to update the file in s3 to force a sensor # eval -# -# To update the local version of the file just run +# +# To update the local version of the file just run # touch customers.txt -BUCKET = 'hooli-demo-branch' -BUCKET = 'hooli-demo' +BUCKET = "hooli-demo-branch" +BUCKET = "hooli-demo" -dev = boto3.session.Session(profile_name='user-cloud-admin') -s3 = dev.client('s3', region_name = 'us-west-2') +dev = boto3.session.Session(profile_name="user-cloud-admin") +s3 = dev.client("s3", region_name="us-west-2") -with open('customers.txt', "rb") as f: - s3.upload_fileobj(f, BUCKET, 'customers.txt') \ No newline at end of file +with open("customers.txt", "rb") as f: + s3.upload_fileobj(f, BUCKET, "customers.txt") diff --git a/hooli_data_eng/sensors/watch_s3.py b/hooli_data_eng/sensors/watch_s3.py index 38e45075..f45aa518 100644 --- a/hooli_data_eng/sensors/watch_s3.py +++ b/hooli_data_eng/sensors/watch_s3.py @@ -3,7 +3,6 @@ # On file updates, this sensor triggers a job # To see an example of sensors off of assets, see definitions.py -from typing import Any from hooli_data_eng.resources.sensor_file_managers import ( FileSystem, ) @@ -48,7 +47,8 @@ def watch_s3_sensor(context, monitor_fs: FileSystem): if last_updated is None: yield SkipReason(f"customers.txt not found, using {env}") return - except: + except Exception as e: + context.log.error(f"Error occurred while checking customers.txt: {str(e)}") yield SkipReason(f"customers.txt not found HERE, using {env}") return diff --git a/hooli_data_eng/utils/__init__.py b/hooli_data_eng/utils/__init__.py index b262d609..0e7a9de7 100644 --- a/hooli_data_eng/utils/__init__.py +++ b/hooli_data_eng/utils/__init__.py @@ -1,7 +1,7 @@ import datetime import time import uuid -from typing import Any, Dict, List +from typing import Any, Dict import random import numpy as np import pandas as pd @@ -25,31 +25,43 @@ def _random_times(n: int): clipped_flipped_dist = np.append( clipped_flipped_dist, clipped_flipped_dist[: n - len(clipped_flipped_dist)] ) - - times = pd.to_datetime((clipped_flipped_dist * (end_u - start_u)) + start_u, unit="s") - hours = times.round('60min').to_pydatetime() + times = pd.to_datetime( + (clipped_flipped_dist * (end_u - start_u)) + start_u, unit="s" + ) + + hours = times.round("60min").to_pydatetime() return hours -def random_data(extra_columns: Dict[str, Any], n: int, filter_date = None) -> pd.DataFrame: - - skus = ["pepsi", "coke", "sprite", "coke zero", "powerade", "diet", "gingerale", "juice"] - - # always have user_id +def random_data( + extra_columns: Dict[str, Any], n: int, filter_date=None +) -> pd.DataFrame: + skus = [ + "pepsi", + "coke", + "sprite", + "coke zero", + "powerade", + "diet", + "gingerale", + "juice", + ] + + # always have user_id data = {"user_id": np.random.randint(0, 1000, size=n)} for name, dtype in extra_columns.items(): - if name == "sku": + if name == "sku": data[name] = random.choices(skus, k=n) - elif dtype == str: + elif isinstance(dtype, str): data[name] = [str(uuid.uuid4()) for _ in range(n)] - elif dtype == int: + elif isinstance(dtype, int): data[name] = np.random.randint(0, 100, size=n) - elif dtype == float: + elif isinstance(dtype, float): data[name] = 100 * np.random.random(size=n) - + data = pd.DataFrame(data) if filter_date: diff --git a/hooli_data_eng/utils/dbt_code_version.py b/hooli_data_eng/utils/dbt_code_version.py index dcf139dd..c8d9621f 100644 --- a/hooli_data_eng/utils/dbt_code_version.py +++ b/hooli_data_eng/utils/dbt_code_version.py @@ -3,11 +3,12 @@ from dagster import AssetKey from hooli_data_eng.project import dbt_project + def get_current_dbt_code_version(asset_key: AssetKey) -> str: with open(dbt_project.manifest_path) as f: manifest = json.load(f) - + model_name = asset_key.path[-1] model_sql = manifest["nodes"][f"model.dbt_project.{model_name}"]["raw_code"] - - return hashlib.sha1(model_sql.encode("utf-8")).hexdigest() \ No newline at end of file + + return hashlib.sha1(model_sql.encode("utf-8")).hexdigest() diff --git a/hooli_data_eng/utils/kind_helpers.py b/hooli_data_eng/utils/kind_helpers.py index 09524f76..41c0cb87 100644 --- a/hooli_data_eng/utils/kind_helpers.py +++ b/hooli_data_eng/utils/kind_helpers.py @@ -1,5 +1,6 @@ import os + def get_kind() -> str: """ Determine the storage kind based on the environment. @@ -8,8 +9,8 @@ def get_kind() -> str: str: The storage kind ('snowflake' or 'duckdb'). """ if ( - os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod" or - os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1" + os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod" + or os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1" ): return "snowflake" - return "duckdb" \ No newline at end of file + return "duckdb" diff --git a/hooli_data_eng_tests/test_assets.py b/hooli_data_eng_tests/test_assets.py index 936abc99..59de7d6d 100644 --- a/hooli_data_eng_tests/test_assets.py +++ b/hooli_data_eng_tests/test_assets.py @@ -1,5 +1,4 @@ - -from hooli_data_eng.assets.raw_data import orders, users, hourly_partitions, build_op_context +from hooli_data_eng.assets.raw_data import orders, users, build_op_context from hooli_data_eng.resources.api import RawDataAPI from dagster import define_asset_job, Definitions @@ -8,32 +7,32 @@ import duckdb import os + def test_orders_single_partition(): orders_df = orders( build_op_context( - resources={"api": RawDataAPI(flaky=False)}, - partition_key="2023-04-10-22:00" + resources={"api": RawDataAPI(flaky=False)}, partition_key="2023-04-10-22:00" ) ) assert len(orders_df) == 10 + def test_users_single_partition(): users_df = users( build_op_context( - resources={"api": RawDataAPI(flaky=False)}, - partition_key="2023-04-10-22:00" + resources={"api": RawDataAPI(flaky=False)}, partition_key="2023-04-10-22:00" ) ) assert len(users_df) == 10 + def test_orders_multi_partition_backfill(): test_job = define_asset_job("test_job", selection=["orders"]) - test_resources = {"io_manager": DuckDBPandasIOManager(database="test.duckdb"), "api": RawDataAPI(flaky=False)} - defs = Definitions( - assets=[orders], - jobs=[test_job], - resources = test_resources - ) + test_resources = { + "io_manager": DuckDBPandasIOManager(database="test.duckdb"), + "api": RawDataAPI(flaky=False), + } + defs = Definitions(assets=[orders], jobs=[test_job], resources=test_resources) test_job_def = defs.get_job_def("test_job") test_job_def.execute_in_process( @@ -41,23 +40,22 @@ def test_orders_multi_partition_backfill(): "dagster/asset_partition_range_start": "2022-04-10-20:00", "dagster/asset_partition_range_end": "2022-04-10-22:00", }, - resources=test_resources + resources=test_resources, ) con = duckdb.connect("test.duckdb") orders_df = con.execute("SELECT * FROM public.orders").fetchdf() os.remove("test.duckdb") assert len(orders_df) == 30 - + def test_users_multi_partition_backfill(): test_job = define_asset_job("test_job", selection=["users"]) - test_resources = {"io_manager": DuckDBPandasIOManager(database="test.duckdb"), "api": RawDataAPI(flaky=False)} - defs = Definitions( - assets=[users], - jobs=[test_job], - resources = test_resources - ) + test_resources = { + "io_manager": DuckDBPandasIOManager(database="test.duckdb"), + "api": RawDataAPI(flaky=False), + } + defs = Definitions(assets=[users], jobs=[test_job], resources=test_resources) test_job_def = defs.get_job_def("test_job") test_job_def.execute_in_process( @@ -65,7 +63,7 @@ def test_users_multi_partition_backfill(): "dagster/asset_partition_range_start": "2022-04-10-20:00", "dagster/asset_partition_range_end": "2022-04-10-22:00", }, - resources=test_resources + resources=test_resources, ) con = duckdb.connect("test.duckdb") diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index e19f2c46..86c77862 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -14,6 +14,7 @@ from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from dagster_snowflake import SnowflakeResource + # Used to derive environment (LOCAL, BRANCH, PROD) def get_env(): if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": @@ -22,6 +23,7 @@ def get_env(): return "PROD" return "LOCAL" + # Setting connection details by environment resource_def = { "LOCAL": { @@ -47,17 +49,23 @@ def get_env(): snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule( "2023-10-29-00:00", snowflake_resource_key="snowflake_insights", - snowflake_usage_latency=45 + snowflake_usage_latency=45, ) defs = Definitions( assets=link_code_references_to_git_if_cloud( - with_source_code_references([*snowflake_insights_definitions.assets,]), + with_source_code_references( + [ + *snowflake_insights_definitions.assets, + ] + ), file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), file_anchor_path_in_repository="hooli_snowflake_insights/definitions.py", ), ), - schedules=[snowflake_insights_definitions.schedule,], + schedules=[ + snowflake_insights_definitions.schedule, + ], resources=resource_def[get_env()], ) diff --git a/pyproject.toml b/pyproject.toml index f3d8f07f..38a59f19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ dev = [ "dagster-webserver", "pytest", + "ruff", ] sling = ["dagster-embedded-elt"]