Skip to content

Commit

Permalink
use DbtProject in hooli_data_eng (#78)
Browse files Browse the repository at this point in the history
expected changes to hooli data eng to move from DbtArtifacts to DbtProject and use the new state handling features

---------

Co-authored-by: Christian Minich <[email protected]>
  • Loading branch information
alangenfeld and cnolanminich authored Apr 15, 2024
1 parent 617a990 commit e55b651
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 114 deletions.
46 changes: 16 additions & 30 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
steps:
- name: Pre-run checks
id: prerun
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38

- name: Checkout
uses: actions/checkout@v3
Expand All @@ -36,14 +36,14 @@ jobs:
- name: Validate configuration
id: ci-validate
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci check --project-dir ${{ env.DAGSTER_PROJECT_DIR }} --dagster-cloud-yaml-path ${{ env.DAGSTER_CLOUD_YAML_PATH }}"

- name: Initialize build session
id: ci-init
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
project_dir: ${{ env.DAGSTER_PROJECT_DIR }}
dagster_cloud_yaml_path: ${{ env.DAGSTER_CLOUD_YAML_PATH }}
Expand All @@ -66,28 +66,14 @@ jobs:
if: ${{ steps.prerun.outputs.result != 'skip' }}
uses: aws-actions/amazon-ecr-login@v1

# Copy the production manifest.json to local directory - Raise error if not retrieved
- name: Download dbt manifest from S3
if: steps.prerun.outputs.result != 'skip' && github.event_name == 'pull_request'
continue-on-error: true
run: |
output=$(aws s3 cp s3://hooli-demo/dbt_slim_ci/manifest.json ./dbt_project/target/slim_ci/manifest.json 2>&1) || echo "Error: $output"
# Build 'data-eng-pipeline' code location
- name: Build dbt manifest for data-eng-pipeline
- name: Prepare dbt project
if: steps.prerun.outputs.result != 'skip'
run: |
pip install pyopenssl --upgrade;
pip install click --upgrade;
pip install dbt-core dbt-duckdb dbt-snowflake;
pip install pip --upgrade;
pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade --upgrade-strategy eager;
make deps
make manifest
# Copy production manifest.json to S3 on merge
- name: Upload dbt manifest to S3
if: steps.prerun.outputs.result != 'skip' && github.event_name != 'pull_request'
run: |
aws s3 cp ./dbt_project/target/manifest.json s3://hooli-demo/dbt_slim_ci/manifest.json
dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py
dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py
- name: Build and upload Docker image for data-eng-pipeline
if: steps.prerun.outputs.result != 'skip'
Expand All @@ -100,7 +86,7 @@ jobs:
- name: Update build session with image tag for data-eng-pipeline
id: ci-set-build-output-data-eng-pipeline
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci set-build-output --location-name=data-eng-pipeline --image-tag=$IMAGE_TAG-data-eng-pipeline"

Expand All @@ -116,7 +102,7 @@ jobs:
- name: Update build session with image tag for basics
id: ci-set-build-output-basics
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci set-build-output --location-name=basics --image-tag=$IMAGE_TAG-basics"

Expand All @@ -132,7 +118,7 @@ jobs:
- name: Update build session with image tag for batch enrichment
id: ci-set-build-output-batch-enrichment
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci set-build-output --location-name=batch_enrichment --image-tag=$IMAGE_TAG-batch-enrichment"

Expand All @@ -148,7 +134,7 @@ jobs:
- name: Update build session with image tag for snowflake insights
id: ci-set-build-output-snowflake-insights
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights"

Expand All @@ -164,7 +150,7 @@ jobs:
- name: Update build session with image tag for demo_assets
id: ci-set-build-output-demo-assets
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci set-build-output --location-name=demo_assets --image-tag=$IMAGE_TAG-demo-assets"

Expand All @@ -181,7 +167,7 @@ jobs:
- name: Deploy to Dagster Cloud
id: ci-deploy
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci deploy"

Expand All @@ -207,13 +193,13 @@ jobs:
- name: Update PR comment for branch deployments
id: ci-notify
if: steps.prerun.outputs.result != 'skip' && always()
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci notify --project-dir=${{ env.DAGSTER_PROJECT_DIR }}"

- name: Generate summary
id: ci-summary
if: steps.prerun.outputs.result != 'skip' && always()
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].27
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected].38
with:
command: "ci status --output-format=markdown >> $GITHUB_STEP_SUMMARY"
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ locally: manifest
clear
dagster dev

clean:
clean:
rm -rf ~/.dagster_home; mkdir ~/.dagster_home; cp dagster.yaml ~/.dagster_home/dagster.yaml

manifest:
dbt parse --project-dir=dbt_project --profiles-dir=dbt_project/config --target BRANCH
dbt parse --project-dir=dbt_project --target BRANCH

deps:
dbt deps --project-dir=dbt_project --profiles-dir=dbt_project/config
dbt deps --project-dir=dbt_project

stateful_dev: clean manifest
export DAGSTER_HOME="~/.dagster_home"; dagster dev
Expand Down
1 change: 1 addition & 0 deletions dbt_project/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.user.yml
1 change: 0 additions & 1 deletion dbt_project/config/.user.yml

This file was deleted.

2 changes: 1 addition & 1 deletion dbt_project/packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ packages:
version: 0.10.3
- git: "https://github.com/dagster-io/dagster.git"
subdirectory: "python_modules/libraries/dagster-dbt/dbt_packages/dagster"
revision: "1.7.0" # replace with the version of `dagster` you are using.
revision: "1.7.1" # replace with the version of `dagster` you are using.
File renamed without changes.
87 changes: 37 additions & 50 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import textwrap
import os
from pathlib import Path
from typing import Any, Mapping
from dagster import (
Expand All @@ -16,16 +15,14 @@
)
from dagster_cloud.dagster_insights import dbt_with_snowflake_insights
from dagster_dbt import (
DbtCliResource,
DbtCliResource,
DagsterDbtTranslator,
load_assets_from_dbt_project,
default_metadata_from_dbt_resource_props,
DagsterDbtTranslatorSettings,
DbtArtifacts,
)
from dagster_dbt.asset_decorator import dbt_assets
from dagster._utils import file_relative_path
from hooli_data_eng.resources import get_env
from hooli_data_eng.resources import dbt_project

# many dbt assets use an incremental approach to avoid
# re-processing all data on each run
Expand All @@ -34,25 +31,11 @@
daily_partitions = DailyPartitionsDefinition(start_date="2023-05-25")
weekly_partitions = WeeklyPartitionsDefinition(start_date="2023-05-25")

DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project")
DBT_PROFILES_DIR = file_relative_path(__file__, "../../dbt_project/config")

# new in 1.6.9, DbtArtifacts is an experimental class that creates a manifest on load
# if DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is present,
# otherwise it points to the already-built manifest
dbt_artifacts = DbtArtifacts(
project_dir=DBT_PROJECT_DIR,
prepare_command=["--quiet",
"parse",
"--target",
get_env(),
"--profiles-dir",
DBT_PROFILES_DIR],
)
DBT_MANIFEST = dbt_artifacts.manifest_path

DBT_MANIFEST = dbt_project.manifest_path

# this manifest represents the last successful dbt deployment and will be compared against the current deployment
SLIM_CI_MANIFEST = Path(
SLIM_CI_MANIFEST = Path(
file_relative_path(__file__, "../../dbt_project/target/slim_ci/")
)

Expand All @@ -61,8 +44,8 @@
)

allow_outdated_and_missing_parents_policy = AutoMaterializePolicy.eager().without_rules(
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing() # non-partitioned assets should run even if some upstream partitions are missing
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing(), # non-partitioned assets should run even if some upstream partitions are missing
)


Expand Down Expand Up @@ -110,22 +93,21 @@ def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, An

return {**default_metadata, **metadata}

def get_auto_materialize_policy(
self, dbt_resource_props: Mapping[str, Any]
):
def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]):
return allow_outdated_parents_policy

def get_owners(
self, dbt_resource_props: Mapping[str, Any] ):
return [dbt_resource_props['group']['owner']['email'], f"team:{dbt_resource_props['group']['name']}"]

def get_owners(self, dbt_resource_props: Mapping[str, Any]):
return [
dbt_resource_props["group"]["owner"]["email"],
f"team:{dbt_resource_props['group']['name']}",
]


class CustomDagsterDbtTranslatorForViews(CustomDagsterDbtTranslator):
def get_auto_materialize_policy(
self, dbt_resource_props: Mapping[str, Any]
):
def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]):
return allow_outdated_and_missing_parents_policy


def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
# map partition key range to dbt vars
first_partition, last_partition = context.asset_partitions_time_window_for_output(
Expand All @@ -137,7 +119,7 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso
# Invoke dbt CLI
dbt_cli_task = dbt.cli(dbt_args, context=context)

# Emits an AssetObservation for each asset materialization, which is used to
# Emits an AssetObservation for each asset materialization, which is used to
# identify the Snowflake credit consumption
yield from dbt_with_snowflake_insights(context, dbt_cli_task)

Expand All @@ -152,7 +134,9 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso
manifest=DBT_MANIFEST,
select="orders_cleaned users_cleaned orders_augmented",
partitions_def=daily_partitions,
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
dagster_dbt_translator=CustomDagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
),
backfill_policy=BackfillPolicy.single_run(),
)
def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
Expand All @@ -163,7 +147,9 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
manifest=DBT_MANIFEST,
select="weekly_order_summary order_stats",
partitions_def=weekly_partitions,
dagster_dbt_translator=CustomDagsterDbtTranslator(DagsterDbtTranslatorSettings(enable_asset_checks=True)),
dagster_dbt_translator=CustomDagsterDbtTranslator(
DagsterDbtTranslatorSettings(enable_asset_checks=True)
),
backfill_policy=BackfillPolicy.single_run(),
)
def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
Expand All @@ -174,14 +160,16 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
manifest=DBT_MANIFEST,
select="company_perf sku_stats company_stats locations_cleaned",
partitions_def=weekly_partitions,
dagster_dbt_translator=CustomDagsterDbtTranslatorForViews(DagsterDbtTranslatorSettings(enable_asset_checks=True)),
dagster_dbt_translator=CustomDagsterDbtTranslatorForViews(
DagsterDbtTranslatorSettings(enable_asset_checks=True)
),
backfill_policy=BackfillPolicy.single_run(),
)
def views_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
# Invoke dbt CLI
dbt_cli_task = dbt2.cli(["build"], context=context)

# Emits an AssetObservation for each asset materialization, which is used to
# Emits an AssetObservation for each asset materialization, which is used to
# identify the Snowflake credit consumption
yield from dbt_with_snowflake_insights(context, dbt_cli_task)

Expand All @@ -193,26 +181,25 @@ def views_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):


# This op will be used to run slim CI
@op(
out={}
)
@op(out={})
def dbt_slim_ci(dbt2: DbtCliResource):
slim_ci_manifest = SLIM_CI_MANIFEST if SLIM_CI_MANIFEST.exists() else DBT_MANIFEST.parent

dbt_command = [
"build",
"--select", "state:modified+",
"--defer",
"--state", f"{slim_ci_manifest}"
"--select",
"state:modified+",
*dbt2.get_defer_args(),
]

yield from dbt2.cli(
args=dbt_command,
manifest=DBT_MANIFEST,
dagster_dbt_translator=CustomDagsterDbtTranslator(DagsterDbtTranslatorSettings(enable_asset_checks=True))
).stream()
dagster_dbt_translator=CustomDagsterDbtTranslator(
DagsterDbtTranslatorSettings(enable_asset_checks=True)
),
).stream()


# This job will be triggered by Pull Request and should only run new or changed dbt models
@job
def dbt_slim_ci_job():
dbt_slim_ci()
dbt_slim_ci()
20 changes: 20 additions & 0 deletions hooli_data_eng/project.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
from dagster_dbt import DbtProject
from dagster._utils import file_relative_path


def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"


DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")

dbt_project = DbtProject(
project_dir=DBT_PROJECT_DIR,
state_path="target/slim_ci",
target=get_env(),
)
Loading

0 comments on commit e55b651

Please sign in to comment.