From 6ffc1e67b800e2b09b6114f124e60e34fefd034d Mon Sep 17 00:00:00 2001 From: Joseph Van Drunen Date: Tue, 19 Sep 2023 11:46:09 -0400 Subject: [PATCH] push --- purina_usage/__init__.py | 10 ++++---- purina_usage/utils/dagster_insights.py | 32 ++++++++++++-------------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/purina_usage/__init__.py b/purina_usage/__init__.py index 3b267bb..c37becd 100644 --- a/purina_usage/__init__.py +++ b/purina_usage/__init__.py @@ -34,20 +34,18 @@ raw_data, group_name="raw_data", # all of these assets live in the snowflake database, under the schema raw_data - key_prefix=["snowflake", "raw_data"], + key_prefix=["raw_data"], ) usage_assets = load_assets_from_package_module( usage, - group_name="snowflake_usage", + group_name="snowflake_insights", # all of these assets live in the snowflake database, under the schema usage_data - key_prefix=["snowflake", "usage_data"], + key_prefix=["snowflake", "insights_data"], ) # define jobs as selections over the larger graph -raw_job = define_asset_job( - "raw_job", selection=["snowflake/raw_data/users", "snowflake/raw_data/orders"] -) +raw_job = define_asset_job("raw_job", selection=["raw_data/users", "raw_data/orders"]) resources = { # this io_manager allows us to load dbt models as pandas dataframes diff --git a/purina_usage/utils/dagster_insights.py b/purina_usage/utils/dagster_insights.py index 5221c5e..f26aae6 100644 --- a/purina_usage/utils/dagster_insights.py +++ b/purina_usage/utils/dagster_insights.py @@ -97,7 +97,16 @@ def get_snowflake_usage(context, query_id, database): break context.log.info("waiting for snowflake usage data") time.sleep(30) - return rows + return [ + { + "metricValue": rows[0][6], + "metricName": "snowflake_credits", + }, + { + "metricValue": rows[0][2], + "metricName": "bytes_processed", + }, + ] def store_dbt_adapter_metrics( @@ -134,23 +143,14 @@ def store_dbt_adapter_metrics( continue # yield (adapter_response_key, result['adapter_response'][adapter_response_key]) if adapter_response_key == "query_id" and "database" in node: - context.log.info( - get_snowflake_usage( - context, result["adapter_response"][adapter_response_key], node["database"] - ) + snowflake_metrics = get_snowflake_usage( + context, result["adapter_response"][adapter_response_key], node["database"] ) - context.log.info( - f"metric: {node['name']}.{adapter_response_key}: {result['adapter_response'][adapter_response_key]}" - ) - metric_values.append( - { - "metricValue": result["adapter_response"][adapter_response_key], - "metricName": adapter_response_key, - } - ) + metric_values.append(*snowflake_metrics) assetMetricDefinitions.append( { "assetKey": node["name"], + "assetGroup": "", "metricValues": metric_values, } ) @@ -207,9 +207,7 @@ def put_context_metrics( context.dagster_run.external_job_origin.external_repository_origin.repository_name ), "assetKey": selected_asset_key, - "assetGroup": context.assets_def.group_names_by_key.get( - selected_asset_key, None - ), + "assetGroup": context.assets_def.group_names_by_key.get(selected_asset_key, ""), "metricValues": [ { "metricValue": metric_def.metric_value,