From 91d2ff6b5f12d46806fd4d88988ff22d1e0265a3 Mon Sep 17 00:00:00 2001 From: Joseph Van Drunen Date: Tue, 19 Sep 2023 09:54:14 -0400 Subject: [PATCH] fixup --- purina_usage/__init__.py | 81 ++++--------------- purina_usage/assets/dbt_snowflake/__init__.py | 27 +++++++ purina_usage/utils/dagster_insights.py | 7 +- 3 files changed, 45 insertions(+), 70 deletions(-) create mode 100644 purina_usage/assets/dbt_snowflake/__init__.py diff --git a/purina_usage/__init__.py b/purina_usage/__init__.py index e7afb99..3b267bb 100644 --- a/purina_usage/__init__.py +++ b/purina_usage/__init__.py @@ -1,6 +1,3 @@ -import os -from pathlib import Path - from dagster import ( Definitions, ScheduleDefinition, @@ -8,65 +5,30 @@ fs_io_manager, load_assets_from_package_module, ) -from dagster_dbt import DbtCliResource, dbt_assets from dagster_snowflake_pandas import snowflake_pandas_io_manager -from purina_usage.assets import raw_data, usage +from purina_usage.assets import raw_data, usage, dbt_snowflake +import os +from pathlib import Path -from dagster import OpExecutionContext -from .utils import dagster_insights +from dagster_dbt import DbtCliResource dbt_project_dir = Path(__file__).joinpath("..", "..", "dbt_project").resolve() -dbt = DbtCliResource( +dbt_cli_resource = DbtCliResource( project_dir=os.fspath(dbt_project_dir), profiles_dir=os.fspath(dbt_project_dir.joinpath("config")), target="staging", ) -# If DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is set, a manifest will be created at run time. -# Otherwise, we expect a manifest to be present in the project's target directory. -# if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"): -# dbt_parse_invocation = dbt.cli(["parse"]).wait() -# dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json") -# else: -# dbt_manifest_path = dbt_project_dir.joinpath("target", "manifest.json") - - -dbt_parse_invocation = dbt.cli(["parse"]).wait() +dbt_parse_invocation = dbt_cli_resource.cli(["parse"]).wait() dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json") -# class CustomDagsterDbtTranslator(DagsterDbtTranslator): -# @classmethod -# def get_group_name(cls, _unused): -# return "test_group" - - -@dbt_assets(manifest=dbt_manifest_path) -def dbt_snowflake_assets(context: OpExecutionContext, dbt: DbtCliResource): - dbt_cli_invocation = dbt.cli(["build"], context=context) - yield from dbt_cli_invocation.stream() - - run_results = dbt_cli_invocation.get_artifact("run_results.json") - manifest = dbt_cli_invocation.get_artifact("manifest.json") - dagster_insights.store_dbt_adapter_metrics(context, manifest, run_results) - - -def partition_key_to_dbt_vars(partition_key): - return {"run_date": partition_key} - - -# all assets live in the default dbt_schema -# dbt_assets = load_assets_from_dbt_project( -# DBT_PROJECT_DIR, -# DBT_PROFILES_DIR, -# # prefix the output assets based on the database they live in plus the name of the schema -# key_prefix=["snowflake", "dbt_schema"], -# # prefix the source assets based on just the database -# # (dagster populates the source schema information automatically) -# source_key_prefix=["snowflake"], -# # partitions_def=DailyPartitionsDefinition(start_date="2023-07-01"), -# # partition_key_to_vars_fn=partition_key_to_dbt_vars, -# ) +dbt_snowflake_assets = load_assets_from_package_module( + dbt_snowflake, + group_name="snowflake_dbt", + # all of these assets live in the snowflake database, under the schema raw_data + key_prefix=["snowflake", "dbt"], +) raw_data_assets = load_assets_from_package_module( raw_data, @@ -86,20 +48,6 @@ def partition_key_to_dbt_vars(partition_key): raw_job = define_asset_job( "raw_job", selection=["snowflake/raw_data/users", "snowflake/raw_data/orders"] ) -# dbt_job = define_asset_job( -# "dbt_job", -# selection=[ -# "snowflake/dbt_schema/company_perf", -# "snowflake/dbt_schema/company_stats", -# "snowflake/dbt_schema/daily_order_summary", -# "snowflake/dbt_schema/order_stats", -# "snowflake/dbt_schema/orders_augmented", -# "snowflake/dbt_schema/orders_cleaned", -# "snowflake/dbt_schema/sku_stats", -# "snowflake/dbt_schema/top_users", -# "snowflake/dbt_schema/users_cleaned", -# ], -# ) resources = { # this io_manager allows us to load dbt models as pandas dataframes @@ -115,15 +63,14 @@ def partition_key_to_dbt_vars(partition_key): # this io_manager is responsible for storing/loading our pickled machine learning model "model_io_manager": fs_io_manager, # this resource is used to execute dbt cli commands - "dbt": dbt, + "dbt": dbt_snowflake.dbt_cli_resource, } defs = Definitions( - assets=[dbt_snowflake_assets, *raw_data_assets, *usage_assets], + assets=[*dbt_snowflake_assets, *raw_data_assets, *usage_assets], resources=resources, schedules=[ ScheduleDefinition(job=raw_job, cron_schedule="@daily"), - # ScheduleDefinition(job=dbt_job, cron_schedule="*/5 * * * *"), ], ) diff --git a/purina_usage/assets/dbt_snowflake/__init__.py b/purina_usage/assets/dbt_snowflake/__init__.py new file mode 100644 index 0000000..2d5183a --- /dev/null +++ b/purina_usage/assets/dbt_snowflake/__init__.py @@ -0,0 +1,27 @@ +import os +from pathlib import Path + +from dagster_dbt import DbtCliResource, dbt_assets + +from dagster import OpExecutionContext +from ...utils import dagster_insights + +dbt_project_dir = Path(__file__).joinpath("..", "..", "..", "..", "dbt_project").resolve() +dbt_cli_resource = DbtCliResource( + project_dir=os.fspath(dbt_project_dir), + profiles_dir=os.fspath(dbt_project_dir.joinpath("config")), + target="staging", +) + +dbt_parse_invocation = dbt_cli_resource.cli(["parse"]).wait() +dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json") + + +@dbt_assets(manifest=dbt_manifest_path) +def dbt_snowflake_assets(context: OpExecutionContext, dbt: DbtCliResource): + dbt_cli_invocation = dbt.cli(["build"], context=context) + yield from dbt_cli_invocation.stream() + + run_results = dbt_cli_invocation.get_artifact("run_results.json") + manifest = dbt_cli_invocation.get_artifact("manifest.json") + dagster_insights.store_dbt_adapter_metrics(context, manifest, run_results) diff --git a/purina_usage/utils/dagster_insights.py b/purina_usage/utils/dagster_insights.py index 78049a3..ba6185b 100644 --- a/purina_usage/utils/dagster_insights.py +++ b/purina_usage/utils/dagster_insights.py @@ -44,7 +44,7 @@ """ -def get_snowflake_usage(query_id, database): +def get_snowflake_usage(context, query_id, database): con = snowflake.connector.connect( user=os.getenv("DAGSTER_INSIGHTS_SNOWFLAKE_USER"), password=os.getenv("DAGSTER_INSIGHTS_SNOWFLAKE_PASSWORD"), @@ -73,7 +73,6 @@ def get_snowflake_usage(query_id, database): """ while True: - # Execute the query cur.execute(query) @@ -81,6 +80,7 @@ def get_snowflake_usage(query_id, database): rows = cur.fetchall() if len(rows) > 0: break + context.log.info("waiting for snowflake usage data") time.sleep(30) return rows @@ -121,7 +121,7 @@ def store_dbt_adapter_metrics( if adapter_response_key == "query_id" and "database" in node: context.log.info( get_snowflake_usage( - result["adapter_response"][adapter_response_key], node["database"] + context, result["adapter_response"][adapter_response_key], node["database"] ) ) context.log.info( @@ -149,6 +149,7 @@ def store_dbt_adapter_metrics( "assetMetricDefinitions": assetMetricDefinitions, } + context.log.info(metric_graphql_input) variables = {"metrics": metric_graphql_input} response = requests.post( URL,