Skip to content

Commit

Permalink
Update examples to remove Dagster code from __init__.py (part 2) (#20809
Browse files Browse the repository at this point in the history
)

## Summary & Motivation

Part 2 of #20771. This PR updates `assets*` and `experimental/*`
examples.

Examples that we should be updated when we update the docs:
- doc_snippets
- project_*
- tutorial*

Edit 07/24:

Updated to match changes in PR #23155 

## How I Tested These Changes

Each example was tested locally + BK
  • Loading branch information
maximearmstrong authored Jul 24, 2024
1 parent b754b20 commit 8bd0d1e
Show file tree
Hide file tree
Showing 41 changed files with 285 additions and 280 deletions.
4 changes: 2 additions & 2 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ To load definitions such as assets and resources, we use <PyObject object="Defin

- We supply resources mapped to the assets using the `resources` argument to the <PyObject object="Definitions" /> object.

```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/__init__.py startafter=gather_assets_start endbefore=gather_assets_end
```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/definitions.py startafter=gather_assets_start endbefore=gather_assets_end
# __init__.py
from dagster import Definitions, load_assets_from_modules

Expand Down Expand Up @@ -145,7 +145,7 @@ def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF:

Here's an extended version of `weather_assets` that contains the new asset:

```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/__init__.py startafter=gather_spark_assets_start endbefore=gather_spark_assets_end
```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/definitions.py startafter=gather_spark_assets_start endbefore=gather_spark_assets_end
# __init__.py

from dagster import Definitions, load_assets_from_modules
Expand Down
56 changes: 0 additions & 56 deletions examples/assets_dbt_python/assets_dbt_python/__init__.py
Original file line number Diff line number Diff line change
@@ -1,56 +0,0 @@
import os

from dagster import (
Definitions,
FilesystemIOManager,
ScheduleDefinition,
define_asset_job,
load_assets_from_package_module,
)
from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import (
build_sensor_for_freshness_checks,
)
from dagster_duckdb_pandas import DuckDBPandasIOManager

from .assets import forecasting, raw_data
from .assets.dbt import DBT_PROJECT_DIR, dbt_project_assets, dbt_resource

raw_data_assets = load_assets_from_package_module(
raw_data,
group_name="raw_data",
# all of these assets live in the duckdb database, under the schema raw_data
key_prefix=["duckdb", "raw_data"],
)

forecasting_assets = load_assets_from_package_module(
forecasting,
group_name="forecasting",
)
all_assets_checks = [*forecasting.forecasting_freshness_checks]

# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason.
freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=all_assets_checks)

# define jobs as selections over the larger graph
everything_job = define_asset_job("everything_everywhere_job", selection="*")
forecast_job = define_asset_job("refresh_forecast_model_job", selection="*order_forecast_model")

resources = {
# this io_manager allows us to load dbt models as pandas dataframes
"io_manager": DuckDBPandasIOManager(database=os.path.join(DBT_PROJECT_DIR, "example.duckdb")),
# this io_manager is responsible for storing/loading our pickled machine learning model
"model_io_manager": FilesystemIOManager(),
# this resource is used to execute dbt cli commands
"dbt": dbt_resource,
}

defs = Definitions(
assets=[dbt_project_assets, *raw_data_assets, *forecasting_assets],
resources=resources,
asset_checks=all_assets_checks,
schedules=[
ScheduleDefinition(job=everything_job, cron_schedule="@weekly"),
ScheduleDefinition(job=forecast_job, cron_schedule="@daily"),
],
sensors=[freshness_check_sensor],
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dagster import AssetExecutionContext, AssetKey, file_relative_path
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets, get_asset_key_for_model

DBT_PROJECT_DIR = file_relative_path(__file__, "../../../dbt_project")
DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project")

dbt_resource = DbtCliResource(project_dir=DBT_PROJECT_DIR)
dbt_parse_invocation = dbt_resource.cli(["parse"]).wait()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dagster import AssetIn, asset, build_last_update_freshness_checks
from scipy import optimize

from ..dbt import daily_order_summary_asset_key
from .dbt import daily_order_summary_asset_key


def model_func(x, a, b):
Expand Down
56 changes: 56 additions & 0 deletions examples/assets_dbt_python/assets_dbt_python/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os

from dagster import (
Definitions,
FilesystemIOManager,
ScheduleDefinition,
define_asset_job,
load_assets_from_modules,
)
from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import (
build_sensor_for_freshness_checks,
)
from dagster_duckdb_pandas import DuckDBPandasIOManager

from .assets import forecasting, raw_data
from .assets.dbt import DBT_PROJECT_DIR, dbt_project_assets, dbt_resource

raw_data_assets = load_assets_from_modules(
[raw_data],
group_name="raw_data",
# all of these assets live in the duckdb database, under the schema raw_data
key_prefix=["duckdb", "raw_data"],
)

forecasting_assets = load_assets_from_modules(
[forecasting],
group_name="forecasting",
)
all_assets_checks = [*forecasting.forecasting_freshness_checks]

# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason.
freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=all_assets_checks)

# define jobs as selections over the larger graph
everything_job = define_asset_job("everything_everywhere_job", selection="*")
forecast_job = define_asset_job("refresh_forecast_model_job", selection="*order_forecast_model")

resources = {
# this io_manager allows us to load dbt models as pandas dataframes
"io_manager": DuckDBPandasIOManager(database=os.path.join(DBT_PROJECT_DIR, "example.duckdb")),
# this io_manager is responsible for storing/loading our pickled machine learning model
"model_io_manager": FilesystemIOManager(),
# this resource is used to execute dbt cli commands
"dbt": dbt_resource,
}

defs = Definitions(
assets=[dbt_project_assets, *forecasting_assets],
resources=resources,
asset_checks=all_assets_checks,
schedules=[
ScheduleDefinition(job=everything_job, cron_schedule="@weekly"),
ScheduleDefinition(job=forecast_job, cron_schedule="@daily"),
],
sensors=[freshness_check_sensor],
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from assets_dbt_python import defs
from assets_dbt_python.definitions import defs


def test_def_can_load():
Expand Down
3 changes: 2 additions & 1 deletion examples/assets_dbt_python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "assets_dbt_python"
module_name = "assets_dbt_python.definitions"
code_location_name = "assets_dbt_python"
Original file line number Diff line number Diff line change
@@ -1,14 +0,0 @@
from dagster import Definitions, load_assets_from_modules
from dagster_duckdb import build_duckdb_io_manager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler

from . import assets
from .release_sensor import release_sensor

duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()])

defs = Definitions(
assets=load_assets_from_modules([assets]),
sensors=[release_sensor],
resources={"warehouse": duckdb_io_manager.configured({"database": "releases.duckdb"})},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import Definitions, load_assets_from_modules
from dagster_duckdb import build_duckdb_io_manager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler

from . import assets
from .release_sensor import release_sensor

duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()])

defs = Definitions(
assets=load_assets_from_modules([assets]),
sensors=[release_sensor],
resources={"warehouse": duckdb_io_manager.configured({"database": "releases.duckdb"})},
)
3 changes: 2 additions & 1 deletion examples/assets_dynamic_partitions/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "assets_dynamic_partitions"
module_name = "assets_dynamic_partitions.definitions"
code_location_name = "assets_dynamic_partitions"
Original file line number Diff line number Diff line change
@@ -1,35 +0,0 @@
from dagster import (
Definitions,
ScheduleDefinition,
define_asset_job,
load_assets_from_package_module,
)
from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import (
build_sensor_for_freshness_checks,
)
from dagster_airbyte import AirbyteResource

from . import assets
from .assets.forecasting import freshness_checks
from .db_io_manager import DbIOManager
from .utils.constants import AIRBYTE_CONFIG, POSTGRES_CONFIG, dbt_resource

# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason.
freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=freshness_checks)

defs = Definitions(
assets=load_assets_from_package_module(assets),
resources={
"airbyte": AirbyteResource(**AIRBYTE_CONFIG),
"dbt": dbt_resource,
"db_io_manager": DbIOManager(**POSTGRES_CONFIG),
},
asset_checks=freshness_checks,
schedules=[
# update all assets once a day
ScheduleDefinition(
job=define_asset_job("all_assets", selection="*"), cron_schedule="@daily"
),
],
sensors=[freshness_check_sensor],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from dagster import (
Definitions,
ScheduleDefinition,
define_asset_job,
load_assets_from_package_module,
)
from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import (
build_sensor_for_freshness_checks,
)
from dagster_airbyte import AirbyteResource

from . import assets
from .assets.forecasting import freshness_checks
from .db_io_manager import DbIOManager
from .utils.constants import AIRBYTE_CONFIG, POSTGRES_CONFIG, dbt_resource

# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason.
freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=freshness_checks)

defs = Definitions(
assets=load_assets_from_package_module(assets),
resources={
"airbyte": AirbyteResource(**AIRBYTE_CONFIG),
"dbt": dbt_resource,
"db_io_manager": DbIOManager(**POSTGRES_CONFIG),
},
asset_checks=freshness_checks,
schedules=[
# update all assets once a day
ScheduleDefinition(
job=define_asset_job("all_assets", selection="*"), cron_schedule="@daily"
),
],
sensors=[freshness_check_sensor],
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from assets_modern_data_stack import defs
from assets_modern_data_stack.definitions import defs


def test_defs_can_load():
Expand Down
3 changes: 2 additions & 1 deletion examples/assets_modern_data_stack/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "assets_modern_data_stack"
module_name = "assets_modern_data_stack.definitions"
code_location_name = "assets_modern_data_stack"
41 changes: 0 additions & 41 deletions examples/assets_pandas_pyspark/assets_pandas_pyspark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,42 +1 @@
def get_weather_defs():
# gather_assets_start

# __init__.py
from dagster import Definitions, load_assets_from_modules

from .assets import table_assets
from .local_filesystem_io_manager import LocalFileSystemIOManager

defs = Definitions(
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
assets=load_assets_from_modules([table_assets]),
resources={
"io_manager": LocalFileSystemIOManager(),
},
)
# gather_assets_end

return defs


def get_spark_weather_defs():
# gather_spark_assets_start

# __init__.py

from dagster import Definitions, load_assets_from_modules

from .assets import spark_asset, table_assets
from .local_spark_filesystem_io_manager import LocalFileSystemIOManager

defs = Definitions(
assets=load_assets_from_modules([table_assets, spark_asset]),
resources={"io_manager": LocalFileSystemIOManager()},
)
# gather_spark_assets_end

return defs


defs = get_spark_weather_defs()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
def get_weather_defs():
# gather_assets_start

# __init__.py
from dagster import Definitions, load_assets_from_modules

from .assets import table_assets
from .local_filesystem_io_manager import LocalFileSystemIOManager

defs = Definitions(
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
assets=load_assets_from_modules([table_assets]),
resources={
"io_manager": LocalFileSystemIOManager(),
},
)
# gather_assets_end

return defs


def get_spark_weather_defs():
# gather_spark_assets_start

# __init__.py

from dagster import Definitions, load_assets_from_modules

from .assets import spark_asset, table_assets
from .local_spark_filesystem_io_manager import LocalFileSystemIOManager

defs = Definitions(
assets=load_assets_from_modules([table_assets, spark_asset]),
resources={"io_manager": LocalFileSystemIOManager()},
)
# gather_spark_assets_end

return defs


defs = get_spark_weather_defs()
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from assets_pandas_pyspark import get_spark_weather_defs, get_weather_defs
from assets_pandas_pyspark.definitions import get_spark_weather_defs, get_weather_defs


def test_weather_defs_can_load():
Expand Down
3 changes: 2 additions & 1 deletion examples/assets_pandas_pyspark/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "assets_pandas_pyspark"
module_name = "assets_pandas_pyspark.definitions"
code_location_name = "assets_pandas_pyspark"
Original file line number Diff line number Diff line change
@@ -1,11 +0,0 @@
from dagster import Definitions, load_assets_from_package_module

from . import (
assets,
lib as lib,
)
from .resources.csv_io_manager import LocalCsvIOManager

defs = Definitions(
assets=load_assets_from_package_module(assets), resources={"io_manager": LocalCsvIOManager()}
)
Loading

1 comment on commit 8bd0d1e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-iln4bt5db-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 8bd0d1e.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.