Skip to content

Commit

Permalink
Merge pull request #63 from dagster-io/izzy/add_compiled_sql
Browse files Browse the repository at this point in the history
Add compiled SQL for dbt models to the event logs
  • Loading branch information
izzye84 authored Mar 11, 2024
2 parents ad1b3c3 + 864310c commit 4f0c7c8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dbt_project/models/CLEANED/locations_cleaned.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ from {{ source("RAW_DATA", "locations") }}
source_renamed as (

select l_user_id as user_id,
l_street_address as streed_address,
l_street_address as street_address,
l_state as state,
l_country as country,
l_zip_code as zip_code,
Expand Down
23 changes: 23 additions & 0 deletions dbt_project/models/CLEANED/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ models:
data_type: "float"
tests:
- greater_than_zero

- name: users_cleaned
description: "Raw users data with test accounts removed"
columns:
Expand All @@ -40,3 +41,25 @@ models:
- name: "created_at"
description: "When the user account was crated"
data_type: "timestamp"

- name: locations_cleaned
description: "Locations data with standardized column names"
columns:
- name: "user_id"
description: "The unique identifier for the user"
data_type: "int"
- name: "street_address"
description: "The user's street address"
data_type: "str"
- name: "state"
description: "The state portion of the user's address"
data_type: "str"
- name: "country"
description: "The unique identifier for the user"
data_type: "str"
- name: "zip_code"
description: "The user's street address"
data_type: "str"
- name: "_sling_loaded_at"
description: "The state portion of the user's address"
data_type: "int"
13 changes: 12 additions & 1 deletion hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,19 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso
dbt_vars = {"min_date": str(first_partition), "max_date": str(last_partition)}
dbt_args = ["build", "--vars", json.dumps(dbt_vars)]

# Invoke dbt CLI
dbt_cli_task = dbt.cli(dbt_args, context=context)

# Emits an AssetObservation for each asset materialization, which is used to
# identify the Snowflake credit consumption
yield from dbt_with_snowflake_insights(context, dbt_cli_task)

# fetch run_results.json to log compiled SQL
run_results_json = dbt_cli_task.get_artifact("run_results.json")
for result in run_results_json["results"]:
model_name = result.get("unique_id")
context.log.info(f"Compiled SQL for {model_name}:\n{result['compiled_code']}")


@dbt_assets(
manifest=DBT_MANIFEST,
Expand Down Expand Up @@ -161,7 +170,9 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):


# This op will be used to run slim CI
@op
@op(
out={}
)
def dbt_slim_ci(dbt2: DbtCliResource):
slim_ci_manifest = SLIM_CI_MANIFEST if SLIM_CI_MANIFEST.exists() else DBT_MANIFEST.parent

Expand Down

0 comments on commit 4f0c7c8

Please sign in to comment.