Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramshackle-Jamathon committed Sep 19, 2023
1 parent 3cc639a commit 91d2ff6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 70 deletions.
81 changes: 14 additions & 67 deletions purina_usage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,34 @@
import os
from pathlib import Path

from dagster import (
Definitions,
ScheduleDefinition,
define_asset_job,
fs_io_manager,
load_assets_from_package_module,
)
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_snowflake_pandas import snowflake_pandas_io_manager

from purina_usage.assets import raw_data, usage
from purina_usage.assets import raw_data, usage, dbt_snowflake
import os
from pathlib import Path

from dagster import OpExecutionContext
from .utils import dagster_insights
from dagster_dbt import DbtCliResource

dbt_project_dir = Path(__file__).joinpath("..", "..", "dbt_project").resolve()
dbt = DbtCliResource(
dbt_cli_resource = DbtCliResource(
project_dir=os.fspath(dbt_project_dir),
profiles_dir=os.fspath(dbt_project_dir.joinpath("config")),
target="staging",
)

# If DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is set, a manifest will be created at run time.
# Otherwise, we expect a manifest to be present in the project's target directory.
# if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
# dbt_parse_invocation = dbt.cli(["parse"]).wait()
# dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json")
# else:
# dbt_manifest_path = dbt_project_dir.joinpath("target", "manifest.json")


dbt_parse_invocation = dbt.cli(["parse"]).wait()
dbt_parse_invocation = dbt_cli_resource.cli(["parse"]).wait()
dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json")

# class CustomDagsterDbtTranslator(DagsterDbtTranslator):
# @classmethod
# def get_group_name(cls, _unused):
# return "test_group"


@dbt_assets(manifest=dbt_manifest_path)
def dbt_snowflake_assets(context: OpExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["build"], context=context)
yield from dbt_cli_invocation.stream()

run_results = dbt_cli_invocation.get_artifact("run_results.json")
manifest = dbt_cli_invocation.get_artifact("manifest.json")
dagster_insights.store_dbt_adapter_metrics(context, manifest, run_results)


def partition_key_to_dbt_vars(partition_key):
return {"run_date": partition_key}


# all assets live in the default dbt_schema
# dbt_assets = load_assets_from_dbt_project(
# DBT_PROJECT_DIR,
# DBT_PROFILES_DIR,
# # prefix the output assets based on the database they live in plus the name of the schema
# key_prefix=["snowflake", "dbt_schema"],
# # prefix the source assets based on just the database
# # (dagster populates the source schema information automatically)
# source_key_prefix=["snowflake"],
# # partitions_def=DailyPartitionsDefinition(start_date="2023-07-01"),
# # partition_key_to_vars_fn=partition_key_to_dbt_vars,
# )
dbt_snowflake_assets = load_assets_from_package_module(
dbt_snowflake,
group_name="snowflake_dbt",
# all of these assets live in the snowflake database, under the schema raw_data
key_prefix=["snowflake", "dbt"],
)

raw_data_assets = load_assets_from_package_module(
raw_data,
Expand All @@ -86,20 +48,6 @@ def partition_key_to_dbt_vars(partition_key):
raw_job = define_asset_job(
"raw_job", selection=["snowflake/raw_data/users", "snowflake/raw_data/orders"]
)
# dbt_job = define_asset_job(
# "dbt_job",
# selection=[
# "snowflake/dbt_schema/company_perf",
# "snowflake/dbt_schema/company_stats",
# "snowflake/dbt_schema/daily_order_summary",
# "snowflake/dbt_schema/order_stats",
# "snowflake/dbt_schema/orders_augmented",
# "snowflake/dbt_schema/orders_cleaned",
# "snowflake/dbt_schema/sku_stats",
# "snowflake/dbt_schema/top_users",
# "snowflake/dbt_schema/users_cleaned",
# ],
# )

resources = {
# this io_manager allows us to load dbt models as pandas dataframes
Expand All @@ -115,15 +63,14 @@ def partition_key_to_dbt_vars(partition_key):
# this io_manager is responsible for storing/loading our pickled machine learning model
"model_io_manager": fs_io_manager,
# this resource is used to execute dbt cli commands
"dbt": dbt,
"dbt": dbt_snowflake.dbt_cli_resource,
}


defs = Definitions(
assets=[dbt_snowflake_assets, *raw_data_assets, *usage_assets],
assets=[*dbt_snowflake_assets, *raw_data_assets, *usage_assets],
resources=resources,
schedules=[
ScheduleDefinition(job=raw_job, cron_schedule="@daily"),
# ScheduleDefinition(job=dbt_job, cron_schedule="*/5 * * * *"),
],
)
27 changes: 27 additions & 0 deletions purina_usage/assets/dbt_snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
from pathlib import Path

from dagster_dbt import DbtCliResource, dbt_assets

from dagster import OpExecutionContext
from ...utils import dagster_insights

dbt_project_dir = Path(__file__).joinpath("..", "..", "..", "..", "dbt_project").resolve()
dbt_cli_resource = DbtCliResource(
project_dir=os.fspath(dbt_project_dir),
profiles_dir=os.fspath(dbt_project_dir.joinpath("config")),
target="staging",
)

dbt_parse_invocation = dbt_cli_resource.cli(["parse"]).wait()
dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json")


@dbt_assets(manifest=dbt_manifest_path)
def dbt_snowflake_assets(context: OpExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["build"], context=context)
yield from dbt_cli_invocation.stream()

run_results = dbt_cli_invocation.get_artifact("run_results.json")
manifest = dbt_cli_invocation.get_artifact("manifest.json")
dagster_insights.store_dbt_adapter_metrics(context, manifest, run_results)
7 changes: 4 additions & 3 deletions purina_usage/utils/dagster_insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"""


def get_snowflake_usage(query_id, database):
def get_snowflake_usage(context, query_id, database):
con = snowflake.connector.connect(
user=os.getenv("DAGSTER_INSIGHTS_SNOWFLAKE_USER"),
password=os.getenv("DAGSTER_INSIGHTS_SNOWFLAKE_PASSWORD"),
Expand Down Expand Up @@ -73,14 +73,14 @@ def get_snowflake_usage(query_id, database):
"""

while True:

# Execute the query
cur.execute(query)

# Fetch all the rows
rows = cur.fetchall()
if len(rows) > 0:
break
context.log.info("waiting for snowflake usage data")
time.sleep(30)
return rows

Expand Down Expand Up @@ -121,7 +121,7 @@ def store_dbt_adapter_metrics(
if adapter_response_key == "query_id" and "database" in node:
context.log.info(
get_snowflake_usage(
result["adapter_response"][adapter_response_key], node["database"]
context, result["adapter_response"][adapter_response_key], node["database"]
)
)
context.log.info(
Expand Down Expand Up @@ -149,6 +149,7 @@ def store_dbt_adapter_metrics(
"assetMetricDefinitions": assetMetricDefinitions,
}

context.log.info(metric_graphql_input)
variables = {"metrics": metric_graphql_input}
response = requests.post(
URL,
Expand Down

0 comments on commit 91d2ff6

Please sign in to comment.