Skip to content

Commit

Permalink
use dagster-insights
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramshackle-Jamathon committed Sep 20, 2023
1 parent 328d825 commit a79041b
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 256 deletions.
28 changes: 26 additions & 2 deletions purina_usage/assets/dbt_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,21 @@
from dagster_dbt import DbtCliResource, dbt_assets

from dagster import OpExecutionContext
from ...utils import dagster_insights
from gql.transport.requests import RequestsHTTPTransport
from dagster_insights import DagsterInsightsClient, SnowflakeConnectionDetails

transport = RequestsHTTPTransport(
url="http://localhost:3000/test/staging/graphql",
use_json=True,
timeout=300,
headers={"Dagster-Cloud-Api-Token": "user:test:joe"},
)
dagster_insights = DagsterInsightsClient(
organization_id="test",
deployment="test",
cloud_user_token="",
transport=transport,
)

dbt_project_dir = Path(__file__).joinpath("..", "..", "..", "..", "dbt_project").resolve()
dbt_cli_resource = DbtCliResource(
Expand All @@ -24,4 +38,14 @@ def dbt_snowflake_assets(context: OpExecutionContext, dbt: DbtCliResource):

run_results = dbt_cli_invocation.get_artifact("run_results.json")
manifest = dbt_cli_invocation.get_artifact("manifest.json")
dagster_insights.store_dbt_adapter_metrics(context, manifest, run_results)
dagster_insights.store_dbt_adapter_metrics(
context,
manifest,
run_results,
snowflake_connection_details=SnowflakeConnectionDetails(
user=os.getenv("SNOWFLAKE_USER", ""),
password=os.getenv("SNOWFLAKE_PASSWORD", ""),
account="na94824.us-east-1",
warehouse="DEVELOPMENT",
),
)
20 changes: 17 additions & 3 deletions purina_usage/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,22 @@
from typing import Generator, Any
from dagster import OpExecutionContext, Output

from gql.transport.requests import RequestsHTTPTransport
from purina_usage.utils import random_data
from ...utils import dagster_insights
from dagster_insights import DagsterInsightsClient, DagsterInsightsMetric

transport = RequestsHTTPTransport(
url="http://localhost:3000/test/staging/graphql",
use_json=True,
timeout=300,
headers={"Dagster-Cloud-Api-Token": "user:test:joe"},
)
dagster_insights = DagsterInsightsClient(
organization_id="test",
deployment="test",
cloud_user_token="",
transport=transport,
)


@asset(compute_kind="random")
Expand All @@ -24,7 +38,7 @@ def users(context: OpExecutionContext) -> Generator[Output[pd.DataFrame], Any, A
dagster_insights.put_context_metrics(
context,
metrics=[
dagster_insights.Metric(
DagsterInsightsMetric(
metric_name="rows_affected",
metric_value=len(data),
)
Expand All @@ -43,7 +57,7 @@ def orders(context: OpExecutionContext) -> Generator[Output[pd.DataFrame], Any,
dagster_insights.put_context_metrics(
context,
metrics=[
dagster_insights.Metric(
DagsterInsightsMetric(
metric_name="rows_affected",
metric_value=len(data),
)
Expand Down
251 changes: 0 additions & 251 deletions purina_usage/utils/dagster_insights.py

This file was deleted.

1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
install_requires=[
"dagster",
"dagster-cloud",
"dagster-insights",
"pandas",
"snowflake-connector-python[pandas]",
"snowflake.sqlalchemy",
Expand Down

0 comments on commit a79041b

Please sign in to comment.