diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index 77ace13a..5650548b 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -25,7 +25,7 @@ jobs: steps: - name: Pre-run checks id: prerun - uses: dagster-io/dagster-cloud-action/actions/utils/prerun@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/prerun@v0.1.38 - name: Checkout uses: actions/checkout@v3 @@ -36,14 +36,14 @@ jobs: - name: Validate configuration id: ci-validate if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci check --project-dir ${{ env.DAGSTER_PROJECT_DIR }} --dagster-cloud-yaml-path ${{ env.DAGSTER_CLOUD_YAML_PATH }}" - name: Initialize build session id: ci-init if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/ci-init@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/ci-init@v0.1.38 with: project_dir: ${{ env.DAGSTER_PROJECT_DIR }} dagster_cloud_yaml_path: ${{ env.DAGSTER_CLOUD_YAML_PATH }} @@ -66,28 +66,14 @@ jobs: if: ${{ steps.prerun.outputs.result != 'skip' }} uses: aws-actions/amazon-ecr-login@v1 - # Copy the production manifest.json to local directory - Raise error if not retrieved - - name: Download dbt manifest from S3 - if: steps.prerun.outputs.result != 'skip' && github.event_name == 'pull_request' - continue-on-error: true - run: | - output=$(aws s3 cp s3://hooli-demo/dbt_slim_ci/manifest.json ./dbt_project/target/slim_ci/manifest.json 2>&1) || echo "Error: $output" - - # Build 'data-eng-pipeline' code location - - name: Build dbt manifest for data-eng-pipeline + - name: Prepare dbt project if: steps.prerun.outputs.result != 'skip' run: | - pip install pyopenssl --upgrade; - pip install click --upgrade; - pip install dbt-core dbt-duckdb dbt-snowflake; + pip install pip --upgrade; + pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade --upgrade-strategy eager; make deps - make manifest - - # Copy production manifest.json to S3 on merge - - name: Upload dbt manifest to S3 - if: steps.prerun.outputs.result != 'skip' && github.event_name != 'pull_request' - run: | - aws s3 cp ./dbt_project/target/manifest.json s3://hooli-demo/dbt_slim_ci/manifest.json + dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py + dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py - name: Build and upload Docker image for data-eng-pipeline if: steps.prerun.outputs.result != 'skip' @@ -100,7 +86,7 @@ jobs: - name: Update build session with image tag for data-eng-pipeline id: ci-set-build-output-data-eng-pipeline if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci set-build-output --location-name=data-eng-pipeline --image-tag=$IMAGE_TAG-data-eng-pipeline" @@ -116,7 +102,7 @@ jobs: - name: Update build session with image tag for basics id: ci-set-build-output-basics if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci set-build-output --location-name=basics --image-tag=$IMAGE_TAG-basics" @@ -132,7 +118,7 @@ jobs: - name: Update build session with image tag for batch enrichment id: ci-set-build-output-batch-enrichment if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci set-build-output --location-name=batch_enrichment --image-tag=$IMAGE_TAG-batch-enrichment" @@ -148,7 +134,7 @@ jobs: - name: Update build session with image tag for snowflake insights id: ci-set-build-output-snowflake-insights if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights" @@ -164,7 +150,7 @@ jobs: - name: Update build session with image tag for demo_assets id: ci-set-build-output-demo-assets if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci set-build-output --location-name=demo_assets --image-tag=$IMAGE_TAG-demo-assets" @@ -181,7 +167,7 @@ jobs: - name: Deploy to Dagster Cloud id: ci-deploy if: steps.prerun.outputs.result != 'skip' - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci deploy" @@ -207,13 +193,13 @@ jobs: - name: Update PR comment for branch deployments id: ci-notify if: steps.prerun.outputs.result != 'skip' && always() - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci notify --project-dir=${{ env.DAGSTER_PROJECT_DIR }}" - name: Generate summary id: ci-summary if: steps.prerun.outputs.result != 'skip' && always() - uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: command: "ci status --output-format=markdown >> $GITHUB_STEP_SUMMARY" diff --git a/Makefile b/Makefile index 412c7f16..d7fd0161 100644 --- a/Makefile +++ b/Makefile @@ -2,14 +2,14 @@ locally: manifest clear dagster dev -clean: +clean: rm -rf ~/.dagster_home; mkdir ~/.dagster_home; cp dagster.yaml ~/.dagster_home/dagster.yaml manifest: - dbt parse --project-dir=dbt_project --profiles-dir=dbt_project/config --target BRANCH + dbt parse --project-dir=dbt_project --target BRANCH deps: - dbt deps --project-dir=dbt_project --profiles-dir=dbt_project/config + dbt deps --project-dir=dbt_project stateful_dev: clean manifest export DAGSTER_HOME="~/.dagster_home"; dagster dev diff --git a/dbt_project/.gitignore b/dbt_project/.gitignore new file mode 100644 index 00000000..ecbed503 --- /dev/null +++ b/dbt_project/.gitignore @@ -0,0 +1 @@ +.user.yml diff --git a/dbt_project/config/.user.yml b/dbt_project/config/.user.yml deleted file mode 100644 index c09c9e2e..00000000 --- a/dbt_project/config/.user.yml +++ /dev/null @@ -1 +0,0 @@ -id: 063ec9a5-3e6a-4a40-9f8c-4c2c222f86cd diff --git a/dbt_project/packages.yml b/dbt_project/packages.yml index 93aafe7f..94823a78 100644 --- a/dbt_project/packages.yml +++ b/dbt_project/packages.yml @@ -3,4 +3,4 @@ packages: version: 0.10.3 - git: "https://github.com/dagster-io/dagster.git" subdirectory: "python_modules/libraries/dagster-dbt/dbt_packages/dagster" - revision: "1.7.0" # replace with the version of `dagster` you are using. \ No newline at end of file + revision: "1.7.1" # replace with the version of `dagster` you are using. diff --git a/dbt_project/config/profiles.yml b/dbt_project/profiles.yml similarity index 100% rename from dbt_project/config/profiles.yml rename to dbt_project/profiles.yml diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index 39affec7..5c956b80 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -1,6 +1,5 @@ import json import textwrap -import os from pathlib import Path from typing import Any, Mapping from dagster import ( @@ -16,16 +15,14 @@ ) from dagster_cloud.dagster_insights import dbt_with_snowflake_insights from dagster_dbt import ( - DbtCliResource, + DbtCliResource, DagsterDbtTranslator, - load_assets_from_dbt_project, default_metadata_from_dbt_resource_props, DagsterDbtTranslatorSettings, - DbtArtifacts, ) from dagster_dbt.asset_decorator import dbt_assets from dagster._utils import file_relative_path -from hooli_data_eng.resources import get_env +from hooli_data_eng.resources import dbt_project # many dbt assets use an incremental approach to avoid # re-processing all data on each run @@ -34,25 +31,11 @@ daily_partitions = DailyPartitionsDefinition(start_date="2023-05-25") weekly_partitions = WeeklyPartitionsDefinition(start_date="2023-05-25") -DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project") -DBT_PROFILES_DIR = file_relative_path(__file__, "../../dbt_project/config") - -# new in 1.6.9, DbtArtifacts is an experimental class that creates a manifest on load -# if DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is present, -# otherwise it points to the already-built manifest -dbt_artifacts = DbtArtifacts( - project_dir=DBT_PROJECT_DIR, - prepare_command=["--quiet", - "parse", - "--target", - get_env(), - "--profiles-dir", - DBT_PROFILES_DIR], -) -DBT_MANIFEST = dbt_artifacts.manifest_path + +DBT_MANIFEST = dbt_project.manifest_path # this manifest represents the last successful dbt deployment and will be compared against the current deployment -SLIM_CI_MANIFEST = Path( +SLIM_CI_MANIFEST = Path( file_relative_path(__file__, "../../dbt_project/target/slim_ci/") ) @@ -61,8 +44,8 @@ ) allow_outdated_and_missing_parents_policy = AutoMaterializePolicy.eager().without_rules( - AutoMaterializeRule.skip_on_parent_outdated(), - AutoMaterializeRule.skip_on_parent_missing() # non-partitioned assets should run even if some upstream partitions are missing + AutoMaterializeRule.skip_on_parent_outdated(), + AutoMaterializeRule.skip_on_parent_missing(), # non-partitioned assets should run even if some upstream partitions are missing ) @@ -110,22 +93,21 @@ def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, An return {**default_metadata, **metadata} - def get_auto_materialize_policy( - self, dbt_resource_props: Mapping[str, Any] - ): + def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]): return allow_outdated_parents_policy - - def get_owners( - self, dbt_resource_props: Mapping[str, Any] ): - return [dbt_resource_props['group']['owner']['email'], f"team:{dbt_resource_props['group']['name']}"] + + def get_owners(self, dbt_resource_props: Mapping[str, Any]): + return [ + dbt_resource_props["group"]["owner"]["email"], + f"team:{dbt_resource_props['group']['name']}", + ] class CustomDagsterDbtTranslatorForViews(CustomDagsterDbtTranslator): - def get_auto_materialize_policy( - self, dbt_resource_props: Mapping[str, Any] - ): + def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]): return allow_outdated_and_missing_parents_policy + def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): # map partition key range to dbt vars first_partition, last_partition = context.asset_partitions_time_window_for_output( @@ -137,7 +119,7 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso # Invoke dbt CLI dbt_cli_task = dbt.cli(dbt_args, context=context) - # Emits an AssetObservation for each asset materialization, which is used to + # Emits an AssetObservation for each asset materialization, which is used to # identify the Snowflake credit consumption yield from dbt_with_snowflake_insights(context, dbt_cli_task) @@ -152,7 +134,9 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso manifest=DBT_MANIFEST, select="orders_cleaned users_cleaned orders_augmented", partitions_def=daily_partitions, - dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)), + dagster_dbt_translator=CustomDagsterDbtTranslator( + settings=DagsterDbtTranslatorSettings(enable_asset_checks=True) + ), backfill_policy=BackfillPolicy.single_run(), ) def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @@ -163,7 +147,9 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): manifest=DBT_MANIFEST, select="weekly_order_summary order_stats", partitions_def=weekly_partitions, - dagster_dbt_translator=CustomDagsterDbtTranslator(DagsterDbtTranslatorSettings(enable_asset_checks=True)), + dagster_dbt_translator=CustomDagsterDbtTranslator( + DagsterDbtTranslatorSettings(enable_asset_checks=True) + ), backfill_policy=BackfillPolicy.single_run(), ) def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @@ -174,14 +160,16 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): manifest=DBT_MANIFEST, select="company_perf sku_stats company_stats locations_cleaned", partitions_def=weekly_partitions, - dagster_dbt_translator=CustomDagsterDbtTranslatorForViews(DagsterDbtTranslatorSettings(enable_asset_checks=True)), + dagster_dbt_translator=CustomDagsterDbtTranslatorForViews( + DagsterDbtTranslatorSettings(enable_asset_checks=True) + ), backfill_policy=BackfillPolicy.single_run(), ) def views_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): # Invoke dbt CLI dbt_cli_task = dbt2.cli(["build"], context=context) - # Emits an AssetObservation for each asset materialization, which is used to + # Emits an AssetObservation for each asset materialization, which is used to # identify the Snowflake credit consumption yield from dbt_with_snowflake_insights(context, dbt_cli_task) @@ -193,26 +181,25 @@ def views_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): # This op will be used to run slim CI -@op( - out={} -) +@op(out={}) def dbt_slim_ci(dbt2: DbtCliResource): - slim_ci_manifest = SLIM_CI_MANIFEST if SLIM_CI_MANIFEST.exists() else DBT_MANIFEST.parent - dbt_command = [ "build", - "--select", "state:modified+", - "--defer", - "--state", f"{slim_ci_manifest}" + "--select", + "state:modified+", + *dbt2.get_defer_args(), ] yield from dbt2.cli( args=dbt_command, manifest=DBT_MANIFEST, - dagster_dbt_translator=CustomDagsterDbtTranslator(DagsterDbtTranslatorSettings(enable_asset_checks=True)) - ).stream() + dagster_dbt_translator=CustomDagsterDbtTranslator( + DagsterDbtTranslatorSettings(enable_asset_checks=True) + ), + ).stream() + # This job will be triggered by Pull Request and should only run new or changed dbt models @job def dbt_slim_ci_job(): - dbt_slim_ci() \ No newline at end of file + dbt_slim_ci() diff --git a/hooli_data_eng/project.py b/hooli_data_eng/project.py new file mode 100644 index 00000000..a1749340 --- /dev/null +++ b/hooli_data_eng/project.py @@ -0,0 +1,20 @@ +import os +from dagster_dbt import DbtProject +from dagster._utils import file_relative_path + + +def get_env(): + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": + return "BRANCH" + if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": + return "PROD" + return "LOCAL" + + +DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project") + +dbt_project = DbtProject( + project_dir=DBT_PROJECT_DIR, + state_path="target/slim_ci", + target=get_env(), +) diff --git a/hooli_data_eng/resources/__init__.py b/hooli_data_eng/resources/__init__.py index 7145d119..77c09db1 100644 --- a/hooli_data_eng/resources/__init__.py +++ b/hooli_data_eng/resources/__init__.py @@ -10,6 +10,11 @@ from dagster_snowflake_pandas import SnowflakePandasIOManager from dagstermill import ConfigurableLocalOutputNotebookIOManager +from hooli_data_eng.project import ( + dbt_project, + get_env, + DBT_PROJECT_DIR, +) from hooli_data_eng.resources.api import RawDataAPI from hooli_data_eng.resources.databricks import db_step_launcher @@ -37,14 +42,6 @@ # and S3 resources -def get_env(): - if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": - return "BRANCH" - if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": - return "PROD" - return "LOCAL" - - client = mock.MagicMock() if get_env() == "PROD": @@ -58,9 +55,6 @@ def get_env(): # specifies what databases to targets, and locally will # execute against a DuckDB -DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project") -DBT_PROFILES_DIR = file_relative_path(__file__, "../../dbt_project/config") - # Similar to having different dbt targets, here we create the resource # configuration by environment @@ -73,12 +67,8 @@ def get_env(): "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), "api": RawDataAPI.configure_at_launch(), "s3": ResourceDefinition.none_resource(), - "dbt": DbtCliClientResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" - ), - "dbt2": DbtCliResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" - ), + "dbt": DbtCliClientResource(project_dir=DBT_PROJECT_DIR, target="LOCAL"), + "dbt2": DbtCliResource(project_dir=dbt_project, target="LOCAL"), "pyspark": pyspark_resource, "step_launcher": ResourceDefinition.none_resource(), "monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")), @@ -102,12 +92,8 @@ def get_env(): ), "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), "api": RawDataAPI.configure_at_launch(), - "dbt": DbtCliClientResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" - ), - "dbt2": DbtCliResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" - ), + "dbt": DbtCliClientResource(project_dir=DBT_PROJECT_DIR, target="BRANCH"), + "dbt2": DbtCliResource(project_dir=dbt_project, target="BRANCH"), "pyspark": pyspark_resource, "step_launcher": db_step_launcher, "monitor_fs": s3FileSystem( @@ -131,12 +117,8 @@ def get_env(): ), "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), "api": RawDataAPI.configure_at_launch(), - "dbt": DbtCliClientResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" - ), - "dbt2": DbtCliResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" - ), + "dbt": DbtCliClientResource(project_dir=DBT_PROJECT_DIR, target="PROD"), + "dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, target="PROD"), "pyspark": pyspark_resource, "step_launcher": db_step_launcher, "monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"),