Skip to content

Commit

Permalink
push
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramshackle-Jamathon committed Sep 19, 2023
1 parent 67f5692 commit 6ffc1e6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 23 deletions.
10 changes: 4 additions & 6 deletions purina_usage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@
raw_data,
group_name="raw_data",
# all of these assets live in the snowflake database, under the schema raw_data
key_prefix=["snowflake", "raw_data"],
key_prefix=["raw_data"],
)

usage_assets = load_assets_from_package_module(
usage,
group_name="snowflake_usage",
group_name="snowflake_insights",
# all of these assets live in the snowflake database, under the schema usage_data
key_prefix=["snowflake", "usage_data"],
key_prefix=["snowflake", "insights_data"],
)

# define jobs as selections over the larger graph
raw_job = define_asset_job(
"raw_job", selection=["snowflake/raw_data/users", "snowflake/raw_data/orders"]
)
raw_job = define_asset_job("raw_job", selection=["raw_data/users", "raw_data/orders"])

resources = {
# this io_manager allows us to load dbt models as pandas dataframes
Expand Down
32 changes: 15 additions & 17 deletions purina_usage/utils/dagster_insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,16 @@ def get_snowflake_usage(context, query_id, database):
break
context.log.info("waiting for snowflake usage data")
time.sleep(30)
return rows
return [
{
"metricValue": rows[0][6],
"metricName": "snowflake_credits",
},
{
"metricValue": rows[0][2],
"metricName": "bytes_processed",
},
]


def store_dbt_adapter_metrics(
Expand Down Expand Up @@ -134,23 +143,14 @@ def store_dbt_adapter_metrics(
continue
# yield (adapter_response_key, result['adapter_response'][adapter_response_key])
if adapter_response_key == "query_id" and "database" in node:
context.log.info(
get_snowflake_usage(
context, result["adapter_response"][adapter_response_key], node["database"]
)
snowflake_metrics = get_snowflake_usage(
context, result["adapter_response"][adapter_response_key], node["database"]
)
context.log.info(
f"metric: {node['name']}.{adapter_response_key}: {result['adapter_response'][adapter_response_key]}"
)
metric_values.append(
{
"metricValue": result["adapter_response"][adapter_response_key],
"metricName": adapter_response_key,
}
)
metric_values.append(*snowflake_metrics)
assetMetricDefinitions.append(
{
"assetKey": node["name"],
"assetGroup": "",
"metricValues": metric_values,
}
)
Expand Down Expand Up @@ -207,9 +207,7 @@ def put_context_metrics(
context.dagster_run.external_job_origin.external_repository_origin.repository_name
),
"assetKey": selected_asset_key,
"assetGroup": context.assets_def.group_names_by_key.get(
selected_asset_key, None
),
"assetGroup": context.assets_def.group_names_by_key.get(selected_asset_key, ""),
"metricValues": [
{
"metricValue": metric_def.metric_value,
Expand Down

0 comments on commit 6ffc1e6

Please sign in to comment.