diff --git a/docs/content/guides/dagster/software-defined-assets.mdx b/docs/content/guides/dagster/software-defined-assets.mdx index d408d7e5e7605..bd1ba81644718 100644 --- a/docs/content/guides/dagster/software-defined-assets.mdx +++ b/docs/content/guides/dagster/software-defined-assets.mdx @@ -70,7 +70,7 @@ To load definitions such as assets and resources, we use object. -```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/__init__.py startafter=gather_assets_start endbefore=gather_assets_end +```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/definitions.py startafter=gather_assets_start endbefore=gather_assets_end # __init__.py from dagster import Definitions, load_assets_from_modules @@ -145,7 +145,7 @@ def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF: Here's an extended version of `weather_assets` that contains the new asset: -```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/__init__.py startafter=gather_spark_assets_start endbefore=gather_spark_assets_end +```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/definitions.py startafter=gather_spark_assets_start endbefore=gather_spark_assets_end # __init__.py from dagster import Definitions, load_assets_from_modules diff --git a/examples/assets_dbt_python/assets_dbt_python/__init__.py b/examples/assets_dbt_python/assets_dbt_python/__init__.py index 5bac27a95cfad..e69de29bb2d1d 100644 --- a/examples/assets_dbt_python/assets_dbt_python/__init__.py +++ b/examples/assets_dbt_python/assets_dbt_python/__init__.py @@ -1,56 +0,0 @@ -import os - -from dagster import ( - Definitions, - FilesystemIOManager, - ScheduleDefinition, - define_asset_job, - load_assets_from_package_module, -) -from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import ( - build_sensor_for_freshness_checks, -) -from dagster_duckdb_pandas import DuckDBPandasIOManager - -from .assets import forecasting, raw_data -from .assets.dbt import DBT_PROJECT_DIR, dbt_project_assets, dbt_resource - -raw_data_assets = load_assets_from_package_module( - raw_data, - group_name="raw_data", - # all of these assets live in the duckdb database, under the schema raw_data - key_prefix=["duckdb", "raw_data"], -) - -forecasting_assets = load_assets_from_package_module( - forecasting, - group_name="forecasting", -) -all_assets_checks = [*forecasting.forecasting_freshness_checks] - -# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason. -freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=all_assets_checks) - -# define jobs as selections over the larger graph -everything_job = define_asset_job("everything_everywhere_job", selection="*") -forecast_job = define_asset_job("refresh_forecast_model_job", selection="*order_forecast_model") - -resources = { - # this io_manager allows us to load dbt models as pandas dataframes - "io_manager": DuckDBPandasIOManager(database=os.path.join(DBT_PROJECT_DIR, "example.duckdb")), - # this io_manager is responsible for storing/loading our pickled machine learning model - "model_io_manager": FilesystemIOManager(), - # this resource is used to execute dbt cli commands - "dbt": dbt_resource, -} - -defs = Definitions( - assets=[dbt_project_assets, *raw_data_assets, *forecasting_assets], - resources=resources, - asset_checks=all_assets_checks, - schedules=[ - ScheduleDefinition(job=everything_job, cron_schedule="@weekly"), - ScheduleDefinition(job=forecast_job, cron_schedule="@daily"), - ], - sensors=[freshness_check_sensor], -) diff --git a/examples/assets_dbt_python/assets_dbt_python/assets/dbt/__init__.py b/examples/assets_dbt_python/assets_dbt_python/assets/dbt.py similarity index 94% rename from examples/assets_dbt_python/assets_dbt_python/assets/dbt/__init__.py rename to examples/assets_dbt_python/assets_dbt_python/assets/dbt.py index 5db796fc9b4ec..b9929dbe3af37 100644 --- a/examples/assets_dbt_python/assets_dbt_python/assets/dbt/__init__.py +++ b/examples/assets_dbt_python/assets_dbt_python/assets/dbt.py @@ -3,7 +3,7 @@ from dagster import AssetExecutionContext, AssetKey, file_relative_path from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets, get_asset_key_for_model -DBT_PROJECT_DIR = file_relative_path(__file__, "../../../dbt_project") +DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project") dbt_resource = DbtCliResource(project_dir=DBT_PROJECT_DIR) dbt_parse_invocation = dbt_resource.cli(["parse"]).wait() diff --git a/examples/assets_dbt_python/assets_dbt_python/assets/forecasting/__init__.py b/examples/assets_dbt_python/assets_dbt_python/assets/forecasting.py similarity index 97% rename from examples/assets_dbt_python/assets_dbt_python/assets/forecasting/__init__.py rename to examples/assets_dbt_python/assets_dbt_python/assets/forecasting.py index b942ed8e92b88..55b39956bf52f 100644 --- a/examples/assets_dbt_python/assets_dbt_python/assets/forecasting/__init__.py +++ b/examples/assets_dbt_python/assets_dbt_python/assets/forecasting.py @@ -6,7 +6,7 @@ from dagster import AssetIn, asset, build_last_update_freshness_checks from scipy import optimize -from ..dbt import daily_order_summary_asset_key +from .dbt import daily_order_summary_asset_key def model_func(x, a, b): diff --git a/examples/assets_dbt_python/assets_dbt_python/assets/raw_data/__init__.py b/examples/assets_dbt_python/assets_dbt_python/assets/raw_data.py similarity index 100% rename from examples/assets_dbt_python/assets_dbt_python/assets/raw_data/__init__.py rename to examples/assets_dbt_python/assets_dbt_python/assets/raw_data.py diff --git a/examples/assets_dbt_python/assets_dbt_python/definitions.py b/examples/assets_dbt_python/assets_dbt_python/definitions.py new file mode 100644 index 0000000000000..ab97f4834da8d --- /dev/null +++ b/examples/assets_dbt_python/assets_dbt_python/definitions.py @@ -0,0 +1,56 @@ +import os + +from dagster import ( + Definitions, + FilesystemIOManager, + ScheduleDefinition, + define_asset_job, + load_assets_from_modules, +) +from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import ( + build_sensor_for_freshness_checks, +) +from dagster_duckdb_pandas import DuckDBPandasIOManager + +from .assets import forecasting, raw_data +from .assets.dbt import DBT_PROJECT_DIR, dbt_project_assets, dbt_resource + +raw_data_assets = load_assets_from_modules( + [raw_data], + group_name="raw_data", + # all of these assets live in the duckdb database, under the schema raw_data + key_prefix=["duckdb", "raw_data"], +) + +forecasting_assets = load_assets_from_modules( + [forecasting], + group_name="forecasting", +) +all_assets_checks = [*forecasting.forecasting_freshness_checks] + +# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason. +freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=all_assets_checks) + +# define jobs as selections over the larger graph +everything_job = define_asset_job("everything_everywhere_job", selection="*") +forecast_job = define_asset_job("refresh_forecast_model_job", selection="*order_forecast_model") + +resources = { + # this io_manager allows us to load dbt models as pandas dataframes + "io_manager": DuckDBPandasIOManager(database=os.path.join(DBT_PROJECT_DIR, "example.duckdb")), + # this io_manager is responsible for storing/loading our pickled machine learning model + "model_io_manager": FilesystemIOManager(), + # this resource is used to execute dbt cli commands + "dbt": dbt_resource, +} + +defs = Definitions( + assets=[dbt_project_assets, *forecasting_assets], + resources=resources, + asset_checks=all_assets_checks, + schedules=[ + ScheduleDefinition(job=everything_job, cron_schedule="@weekly"), + ScheduleDefinition(job=forecast_job, cron_schedule="@daily"), + ], + sensors=[freshness_check_sensor], +) diff --git a/examples/assets_dbt_python/assets_dbt_python/utils/__init__.py b/examples/assets_dbt_python/assets_dbt_python/utils.py similarity index 100% rename from examples/assets_dbt_python/assets_dbt_python/utils/__init__.py rename to examples/assets_dbt_python/assets_dbt_python/utils.py diff --git a/examples/assets_dbt_python/assets_dbt_python_tests/test_defs.py b/examples/assets_dbt_python/assets_dbt_python_tests/test_defs.py index 9b0164eb89dfa..43a1207ad6397 100644 --- a/examples/assets_dbt_python/assets_dbt_python_tests/test_defs.py +++ b/examples/assets_dbt_python/assets_dbt_python_tests/test_defs.py @@ -1,4 +1,4 @@ -from assets_dbt_python import defs +from assets_dbt_python.definitions import defs def test_def_can_load(): diff --git a/examples/assets_dbt_python/pyproject.toml b/examples/assets_dbt_python/pyproject.toml index 5ed18b24a1860..f5fdf2bf897fd 100644 --- a/examples/assets_dbt_python/pyproject.toml +++ b/examples/assets_dbt_python/pyproject.toml @@ -3,4 +3,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "assets_dbt_python" +module_name = "assets_dbt_python.definitions" +code_location_name = "assets_dbt_python" diff --git a/examples/assets_dynamic_partitions/assets_dynamic_partitions/__init__.py b/examples/assets_dynamic_partitions/assets_dynamic_partitions/__init__.py index 99cfed08c38b1..e69de29bb2d1d 100644 --- a/examples/assets_dynamic_partitions/assets_dynamic_partitions/__init__.py +++ b/examples/assets_dynamic_partitions/assets_dynamic_partitions/__init__.py @@ -1,14 +0,0 @@ -from dagster import Definitions, load_assets_from_modules -from dagster_duckdb import build_duckdb_io_manager -from dagster_duckdb_pandas import DuckDBPandasTypeHandler - -from . import assets -from .release_sensor import release_sensor - -duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()]) - -defs = Definitions( - assets=load_assets_from_modules([assets]), - sensors=[release_sensor], - resources={"warehouse": duckdb_io_manager.configured({"database": "releases.duckdb"})}, -) diff --git a/examples/assets_dynamic_partitions/assets_dynamic_partitions/definitions.py b/examples/assets_dynamic_partitions/assets_dynamic_partitions/definitions.py new file mode 100644 index 0000000000000..99cfed08c38b1 --- /dev/null +++ b/examples/assets_dynamic_partitions/assets_dynamic_partitions/definitions.py @@ -0,0 +1,14 @@ +from dagster import Definitions, load_assets_from_modules +from dagster_duckdb import build_duckdb_io_manager +from dagster_duckdb_pandas import DuckDBPandasTypeHandler + +from . import assets +from .release_sensor import release_sensor + +duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()]) + +defs = Definitions( + assets=load_assets_from_modules([assets]), + sensors=[release_sensor], + resources={"warehouse": duckdb_io_manager.configured({"database": "releases.duckdb"})}, +) diff --git a/examples/assets_dynamic_partitions/pyproject.toml b/examples/assets_dynamic_partitions/pyproject.toml index 3c76b8e6875ee..e078eb17a0843 100644 --- a/examples/assets_dynamic_partitions/pyproject.toml +++ b/examples/assets_dynamic_partitions/pyproject.toml @@ -3,4 +3,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "assets_dynamic_partitions" +module_name = "assets_dynamic_partitions.definitions" +code_location_name = "assets_dynamic_partitions" diff --git a/examples/assets_modern_data_stack/assets_modern_data_stack/__init__.py b/examples/assets_modern_data_stack/assets_modern_data_stack/__init__.py index 16b85cd7adc9c..e69de29bb2d1d 100644 --- a/examples/assets_modern_data_stack/assets_modern_data_stack/__init__.py +++ b/examples/assets_modern_data_stack/assets_modern_data_stack/__init__.py @@ -1,35 +0,0 @@ -from dagster import ( - Definitions, - ScheduleDefinition, - define_asset_job, - load_assets_from_package_module, -) -from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import ( - build_sensor_for_freshness_checks, -) -from dagster_airbyte import AirbyteResource - -from . import assets -from .assets.forecasting import freshness_checks -from .db_io_manager import DbIOManager -from .utils.constants import AIRBYTE_CONFIG, POSTGRES_CONFIG, dbt_resource - -# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason. -freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=freshness_checks) - -defs = Definitions( - assets=load_assets_from_package_module(assets), - resources={ - "airbyte": AirbyteResource(**AIRBYTE_CONFIG), - "dbt": dbt_resource, - "db_io_manager": DbIOManager(**POSTGRES_CONFIG), - }, - asset_checks=freshness_checks, - schedules=[ - # update all assets once a day - ScheduleDefinition( - job=define_asset_job("all_assets", selection="*"), cron_schedule="@daily" - ), - ], - sensors=[freshness_check_sensor], -) diff --git a/examples/assets_modern_data_stack/assets_modern_data_stack/definitions.py b/examples/assets_modern_data_stack/assets_modern_data_stack/definitions.py new file mode 100644 index 0000000000000..16b85cd7adc9c --- /dev/null +++ b/examples/assets_modern_data_stack/assets_modern_data_stack/definitions.py @@ -0,0 +1,35 @@ +from dagster import ( + Definitions, + ScheduleDefinition, + define_asset_job, + load_assets_from_package_module, +) +from dagster._core.definitions.asset_check_factories.freshness_checks.sensor import ( + build_sensor_for_freshness_checks, +) +from dagster_airbyte import AirbyteResource + +from . import assets +from .assets.forecasting import freshness_checks +from .db_io_manager import DbIOManager +from .utils.constants import AIRBYTE_CONFIG, POSTGRES_CONFIG, dbt_resource + +# The freshness check sensor will run our freshness checks even if the underlying asset fails to run, for whatever reason. +freshness_check_sensor = build_sensor_for_freshness_checks(freshness_checks=freshness_checks) + +defs = Definitions( + assets=load_assets_from_package_module(assets), + resources={ + "airbyte": AirbyteResource(**AIRBYTE_CONFIG), + "dbt": dbt_resource, + "db_io_manager": DbIOManager(**POSTGRES_CONFIG), + }, + asset_checks=freshness_checks, + schedules=[ + # update all assets once a day + ScheduleDefinition( + job=define_asset_job("all_assets", selection="*"), cron_schedule="@daily" + ), + ], + sensors=[freshness_check_sensor], +) diff --git a/examples/assets_modern_data_stack/assets_modern_data_stack_tests/test_defs.py b/examples/assets_modern_data_stack/assets_modern_data_stack_tests/test_defs.py index e17d3e985c865..92a5ddc38bd86 100644 --- a/examples/assets_modern_data_stack/assets_modern_data_stack_tests/test_defs.py +++ b/examples/assets_modern_data_stack/assets_modern_data_stack_tests/test_defs.py @@ -1,4 +1,4 @@ -from assets_modern_data_stack import defs +from assets_modern_data_stack.definitions import defs def test_defs_can_load(): diff --git a/examples/assets_modern_data_stack/pyproject.toml b/examples/assets_modern_data_stack/pyproject.toml index 54f6598490573..ef324b1d09174 100644 --- a/examples/assets_modern_data_stack/pyproject.toml +++ b/examples/assets_modern_data_stack/pyproject.toml @@ -3,4 +3,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "assets_modern_data_stack" +module_name = "assets_modern_data_stack.definitions" +code_location_name = "assets_modern_data_stack" diff --git a/examples/assets_pandas_pyspark/assets_pandas_pyspark/__init__.py b/examples/assets_pandas_pyspark/assets_pandas_pyspark/__init__.py index c2b0d0ac7e110..8b137891791fe 100644 --- a/examples/assets_pandas_pyspark/assets_pandas_pyspark/__init__.py +++ b/examples/assets_pandas_pyspark/assets_pandas_pyspark/__init__.py @@ -1,42 +1 @@ -def get_weather_defs(): - # gather_assets_start - # __init__.py - from dagster import Definitions, load_assets_from_modules - - from .assets import table_assets - from .local_filesystem_io_manager import LocalFileSystemIOManager - - defs = Definitions( - # imports the module called "assets" from the package containing the current module - # the "assets" module contains the asset definitions - assets=load_assets_from_modules([table_assets]), - resources={ - "io_manager": LocalFileSystemIOManager(), - }, - ) - # gather_assets_end - - return defs - - -def get_spark_weather_defs(): - # gather_spark_assets_start - - # __init__.py - - from dagster import Definitions, load_assets_from_modules - - from .assets import spark_asset, table_assets - from .local_spark_filesystem_io_manager import LocalFileSystemIOManager - - defs = Definitions( - assets=load_assets_from_modules([table_assets, spark_asset]), - resources={"io_manager": LocalFileSystemIOManager()}, - ) - # gather_spark_assets_end - - return defs - - -defs = get_spark_weather_defs() diff --git a/examples/assets_pandas_pyspark/assets_pandas_pyspark/definitions.py b/examples/assets_pandas_pyspark/assets_pandas_pyspark/definitions.py new file mode 100644 index 0000000000000..c2b0d0ac7e110 --- /dev/null +++ b/examples/assets_pandas_pyspark/assets_pandas_pyspark/definitions.py @@ -0,0 +1,42 @@ +def get_weather_defs(): + # gather_assets_start + + # __init__.py + from dagster import Definitions, load_assets_from_modules + + from .assets import table_assets + from .local_filesystem_io_manager import LocalFileSystemIOManager + + defs = Definitions( + # imports the module called "assets" from the package containing the current module + # the "assets" module contains the asset definitions + assets=load_assets_from_modules([table_assets]), + resources={ + "io_manager": LocalFileSystemIOManager(), + }, + ) + # gather_assets_end + + return defs + + +def get_spark_weather_defs(): + # gather_spark_assets_start + + # __init__.py + + from dagster import Definitions, load_assets_from_modules + + from .assets import spark_asset, table_assets + from .local_spark_filesystem_io_manager import LocalFileSystemIOManager + + defs = Definitions( + assets=load_assets_from_modules([table_assets, spark_asset]), + resources={"io_manager": LocalFileSystemIOManager()}, + ) + # gather_spark_assets_end + + return defs + + +defs = get_spark_weather_defs() diff --git a/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_defs.py b/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_defs.py index bf09c631266e9..eb4123dda1d38 100644 --- a/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_defs.py +++ b/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_defs.py @@ -1,4 +1,4 @@ -from assets_pandas_pyspark import get_spark_weather_defs, get_weather_defs +from assets_pandas_pyspark.definitions import get_spark_weather_defs, get_weather_defs def test_weather_defs_can_load(): diff --git a/examples/assets_pandas_pyspark/pyproject.toml b/examples/assets_pandas_pyspark/pyproject.toml index 557c1be50f1d1..af5bb9b1aebc4 100644 --- a/examples/assets_pandas_pyspark/pyproject.toml +++ b/examples/assets_pandas_pyspark/pyproject.toml @@ -3,4 +3,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "assets_pandas_pyspark" +module_name = "assets_pandas_pyspark.definitions" +code_location_name = "assets_pandas_pyspark" diff --git a/examples/assets_pandas_type_metadata/assets_pandas_type_metadata/__init__.py b/examples/assets_pandas_type_metadata/assets_pandas_type_metadata/__init__.py index 2c149891d4df9..e69de29bb2d1d 100644 --- a/examples/assets_pandas_type_metadata/assets_pandas_type_metadata/__init__.py +++ b/examples/assets_pandas_type_metadata/assets_pandas_type_metadata/__init__.py @@ -1,11 +0,0 @@ -from dagster import Definitions, load_assets_from_package_module - -from . import ( - assets, - lib as lib, -) -from .resources.csv_io_manager import LocalCsvIOManager - -defs = Definitions( - assets=load_assets_from_package_module(assets), resources={"io_manager": LocalCsvIOManager()} -) diff --git a/examples/assets_pandas_type_metadata/assets_pandas_type_metadata/definitions.py b/examples/assets_pandas_type_metadata/assets_pandas_type_metadata/definitions.py new file mode 100644 index 0000000000000..2c149891d4df9 --- /dev/null +++ b/examples/assets_pandas_type_metadata/assets_pandas_type_metadata/definitions.py @@ -0,0 +1,11 @@ +from dagster import Definitions, load_assets_from_package_module + +from . import ( + assets, + lib as lib, +) +from .resources.csv_io_manager import LocalCsvIOManager + +defs = Definitions( + assets=load_assets_from_package_module(assets), resources={"io_manager": LocalCsvIOManager()} +) diff --git a/examples/assets_pandas_type_metadata/assets_pandas_type_metadata_tests/test_defs.py b/examples/assets_pandas_type_metadata/assets_pandas_type_metadata_tests/test_defs.py index a091b0a32c59f..f85b66e4b06de 100644 --- a/examples/assets_pandas_type_metadata/assets_pandas_type_metadata_tests/test_defs.py +++ b/examples/assets_pandas_type_metadata/assets_pandas_type_metadata_tests/test_defs.py @@ -1,4 +1,4 @@ -from assets_pandas_type_metadata import defs +from assets_pandas_type_metadata.definitions import defs def test_defs_can_load(): diff --git a/examples/assets_pandas_type_metadata/pyproject.toml b/examples/assets_pandas_type_metadata/pyproject.toml index 349312c5b674a..e0843eb9285c4 100644 --- a/examples/assets_pandas_type_metadata/pyproject.toml +++ b/examples/assets_pandas_type_metadata/pyproject.toml @@ -3,4 +3,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "assets_pandas_type_metadata" \ No newline at end of file +module_name = "assets_pandas_type_metadata.definitions" +code_location_name = "assets_pandas_type_metadata" \ No newline at end of file diff --git a/examples/assets_smoke_test/assets_smoke_test/assets/__init__.py b/examples/assets_smoke_test/assets_smoke_test/assets/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/assets_smoke_test/assets_smoke_test/pure_python_assets.py b/examples/assets_smoke_test/assets_smoke_test/assets/pure_python_assets.py similarity index 100% rename from examples/assets_smoke_test/assets_smoke_test/pure_python_assets.py rename to examples/assets_smoke_test/assets_smoke_test/assets/pure_python_assets.py diff --git a/examples/assets_smoke_test/assets_smoke_test/python_and_dbt_assets.py b/examples/assets_smoke_test/assets_smoke_test/assets/python_and_dbt_assets.py similarity index 65% rename from examples/assets_smoke_test/assets_smoke_test/python_and_dbt_assets.py rename to examples/assets_smoke_test/assets_smoke_test/assets/python_and_dbt_assets.py index e754297b02b19..0380f1f29f465 100644 --- a/examples/assets_smoke_test/assets_smoke_test/python_and_dbt_assets.py +++ b/examples/assets_smoke_test/assets_smoke_test/assets/python_and_dbt_assets.py @@ -1,18 +1,10 @@ -from dagster import ( - Definitions, - EnvVar, - SourceAsset, - TableSchema, - asset, - load_assets_from_current_module, -) +from dagster import SourceAsset, TableSchema, asset from dagster._core.execution.context.compute import AssetExecutionContext from dagster._utils import file_relative_path from dagster_dbt import DbtCliResource, dbt_assets -from dagster_snowflake_pandas import SnowflakePandasIOManager from pandas import DataFrame -DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project") +DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project") dbt_resource = DbtCliResource(project_dir=DBT_PROJECT_DIR) dbt_parse_invocation = dbt_resource.cli(["parse"]).wait() dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json") @@ -45,18 +37,3 @@ def country_stats(country_populations: DataFrame, continent_stats: DataFrame) -> @dbt_assets(manifest=dbt_manifest_path) def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() - - -defs = Definitions( - assets=load_assets_from_current_module(), - resources={ - "io_manager": SnowflakePandasIOManager( - account=EnvVar("SNOWFLAKE_ACCOUNT"), - user=EnvVar("SNOWFLAKE_USER"), - password=EnvVar("SNOWFLAKE_PASSWORD"), - database=EnvVar("SNOWFLAKE_DATABASE"), - warehouse=EnvVar("SNOWFLAKE_WAREHOUSE"), - ), - "dbt": dbt_resource, - }, -) diff --git a/examples/assets_smoke_test/assets_smoke_test/definitions.py b/examples/assets_smoke_test/assets_smoke_test/definitions.py new file mode 100644 index 0000000000000..bce24a40f22ae --- /dev/null +++ b/examples/assets_smoke_test/assets_smoke_test/definitions.py @@ -0,0 +1,21 @@ +from dagster import Definitions, EnvVar, load_assets_from_modules +from dagster_snowflake_pandas import SnowflakePandasIOManager + +from .assets import python_and_dbt_assets +from .assets.python_and_dbt_assets import dbt_resource + +defs = Definitions( + assets=load_assets_from_modules( + modules=[python_and_dbt_assets], + ), + resources={ + "io_manager": SnowflakePandasIOManager( + account=EnvVar("SNOWFLAKE_ACCOUNT"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + database=EnvVar("SNOWFLAKE_DATABASE"), + warehouse=EnvVar("SNOWFLAKE_WAREHOUSE"), + ), + "dbt": dbt_resource, + }, +) diff --git a/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py b/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py index 3bfea5a7f2ae1..8730f7fd02aee 100644 --- a/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py +++ b/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py @@ -1,7 +1,7 @@ from dagster import InMemoryIOManager, TableSchema, load_assets_from_modules, materialize from pandas import DataFrame, Series -from assets_smoke_test import pure_python_assets +from assets_smoke_test.assets import pure_python_assets def empty_dataframe_from_column_schema(column_schema: TableSchema) -> DataFrame: diff --git a/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_python_and_dbt_assets.py b/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_python_and_dbt_assets.py index 9d93912969312..c5d3322c795f3 100644 --- a/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_python_and_dbt_assets.py +++ b/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_python_and_dbt_assets.py @@ -5,8 +5,8 @@ from dagster_dbt import DbtCliResource from dagster_snowflake_pandas import SnowflakePandasIOManager -from assets_smoke_test import python_and_dbt_assets -from assets_smoke_test.python_and_dbt_assets import DBT_PROJECT_DIR, raw_country_populations +from assets_smoke_test.assets import python_and_dbt_assets +from assets_smoke_test.assets.python_and_dbt_assets import DBT_PROJECT_DIR, raw_country_populations def smoke_all_test(): diff --git a/examples/assets_smoke_test/pyproject.toml b/examples/assets_smoke_test/pyproject.toml index f6f5925276192..0905351f906c9 100644 --- a/examples/assets_smoke_test/pyproject.toml +++ b/examples/assets_smoke_test/pyproject.toml @@ -3,4 +3,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "assets_smoke_test.python_and_dbt_assets" +module_name = "assets_smoke_test.definitions" +code_location_name = "assets_smoke_test" diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py index 07a903cd005f3..8b137891791fe 100644 --- a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py @@ -1,10 +1 @@ -from dagster import Definitions -from dagster._core.pipes.subprocess import PipesSubprocessClient -from .domain_specific_dsl.stocks_dsl import get_stocks_dsl_example_defs -from .pure_assets_dsl.assets_dsl import get_asset_dsl_example_defs - -defs = Definitions( - assets=get_asset_dsl_example_defs() + get_stocks_dsl_example_defs(), - resources={"pipes_subprocess_client": PipesSubprocessClient()}, -) diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/definitions.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/definitions.py new file mode 100644 index 0000000000000..07a903cd005f3 --- /dev/null +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/definitions.py @@ -0,0 +1,10 @@ +from dagster import Definitions +from dagster._core.pipes.subprocess import PipesSubprocessClient + +from .domain_specific_dsl.stocks_dsl import get_stocks_dsl_example_defs +from .pure_assets_dsl.assets_dsl import get_asset_dsl_example_defs + +defs = Definitions( + assets=get_asset_dsl_example_defs() + get_stocks_dsl_example_defs(), + resources={"pipes_subprocess_client": PipesSubprocessClient()}, +) diff --git a/examples/experimental/assets_yaml_dsl/pyproject.toml b/examples/experimental/assets_yaml_dsl/pyproject.toml index bf45782c79cf7..abdf8f2bb5a27 100644 --- a/examples/experimental/assets_yaml_dsl/pyproject.toml +++ b/examples/experimental/assets_yaml_dsl/pyproject.toml @@ -3,4 +3,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "assets_yaml_dsl" +module_name = "assets_yaml_dsl.definitions" +code_location_name = "assets_yaml_dsl" \ No newline at end of file diff --git a/examples/experimental/external_assets/external_assets/__init__.py b/examples/experimental/external_assets/external_assets/__init__.py index 5ad6ee31c90f3..e69de29bb2d1d 100644 --- a/examples/experimental/external_assets/external_assets/__init__.py +++ b/examples/experimental/external_assets/external_assets/__init__.py @@ -1,27 +0,0 @@ -import os - -import kubernetes -from dagster import Definitions -from dagster_k8s import PipesK8sClient - -from .external_assets import external_asset_defs -from .pipes import ( - telem_post_processing, - telem_post_processing_check, - telem_post_processing_job, - telem_post_processing_sensor, -) - -config_file = os.path.expanduser("~/.kube/config") - -kubernetes.config.load_kube_config(config_file) - -defs = Definitions( - assets=[*external_asset_defs, telem_post_processing], - sensors=[telem_post_processing_sensor], - jobs=[telem_post_processing_job], - asset_checks=[telem_post_processing_check], - resources={ - "k8s_pipes_client": PipesK8sClient(), - }, -) diff --git a/examples/experimental/external_assets/external_assets/definitions.py b/examples/experimental/external_assets/external_assets/definitions.py new file mode 100644 index 0000000000000..5ad6ee31c90f3 --- /dev/null +++ b/examples/experimental/external_assets/external_assets/definitions.py @@ -0,0 +1,27 @@ +import os + +import kubernetes +from dagster import Definitions +from dagster_k8s import PipesK8sClient + +from .external_assets import external_asset_defs +from .pipes import ( + telem_post_processing, + telem_post_processing_check, + telem_post_processing_job, + telem_post_processing_sensor, +) + +config_file = os.path.expanduser("~/.kube/config") + +kubernetes.config.load_kube_config(config_file) + +defs = Definitions( + assets=[*external_asset_defs, telem_post_processing], + sensors=[telem_post_processing_sensor], + jobs=[telem_post_processing_job], + asset_checks=[telem_post_processing_check], + resources={ + "k8s_pipes_client": PipesK8sClient(), + }, +) diff --git a/examples/experimental/external_assets/workspace.yaml b/examples/experimental/external_assets/workspace.yaml index 11aa295acdcae..76164728d8da7 100644 --- a/examples/experimental/external_assets/workspace.yaml +++ b/examples/experimental/external_assets/workspace.yaml @@ -1,2 +1,4 @@ load_from: - - python_package: external_assets + - python_package: + package_name: external_assets.definitions + location_name: external_assets diff --git a/examples/experimental/sling_decorator/pyproject.toml b/examples/experimental/sling_decorator/pyproject.toml index 0048c76d54383..c6770877cede7 100644 --- a/examples/experimental/sling_decorator/pyproject.toml +++ b/examples/experimental/sling_decorator/pyproject.toml @@ -12,4 +12,5 @@ dependencies = [ packages = ["sling_decorator"] [tool.dagster] -module_name = "sling_decorator" \ No newline at end of file +module_name = "sling_decorator.definitions" +code_location_name = "sling_decorator" \ No newline at end of file diff --git a/examples/experimental/sling_decorator/sling_decorator/__init__.py b/examples/experimental/sling_decorator/sling_decorator/__init__.py index 20e07eda7efc0..e69de29bb2d1d 100644 --- a/examples/experimental/sling_decorator/sling_decorator/__init__.py +++ b/examples/experimental/sling_decorator/sling_decorator/__init__.py @@ -1,37 +0,0 @@ -from dagster import Definitions, file_relative_path -from dagster_embedded_elt.sling import sling_assets -from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource - -replication_config = file_relative_path(__file__, "../sling_replication.yaml") - -sling_resource = SlingResource( - connections=[ - SlingConnectionResource( - name="MY_POSTGRES", - type="postgres", - connection_string="postgres://postgres:postgres@localhost:54321/finance?sslmode=disable", - ), - SlingConnectionResource( - name="MY_DUCKDB", - type="duckdb", - connection_string="duckdb:///var/tmp/duckdb.db", - ), - ] -) - - -@sling_assets(replication_config=replication_config) -def my_assets(context, sling: SlingResource): - yield from sling.replicate(context=context) - for row in sling.stream_raw_logs(): - context.log.info(row) - - -defs = Definitions( - assets=[ - my_assets, - ], - resources={ - "sling": sling_resource, - }, -) diff --git a/examples/experimental/sling_decorator/sling_decorator/definitions.py b/examples/experimental/sling_decorator/sling_decorator/definitions.py new file mode 100644 index 0000000000000..20e07eda7efc0 --- /dev/null +++ b/examples/experimental/sling_decorator/sling_decorator/definitions.py @@ -0,0 +1,37 @@ +from dagster import Definitions, file_relative_path +from dagster_embedded_elt.sling import sling_assets +from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource + +replication_config = file_relative_path(__file__, "../sling_replication.yaml") + +sling_resource = SlingResource( + connections=[ + SlingConnectionResource( + name="MY_POSTGRES", + type="postgres", + connection_string="postgres://postgres:postgres@localhost:54321/finance?sslmode=disable", + ), + SlingConnectionResource( + name="MY_DUCKDB", + type="duckdb", + connection_string="duckdb:///var/tmp/duckdb.db", + ), + ] +) + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate(context=context) + for row in sling.stream_raw_logs(): + context.log.info(row) + + +defs = Definitions( + assets=[ + my_assets, + ], + resources={ + "sling": sling_resource, + }, +) diff --git a/examples/feature_graph_backed_assets/feature_graph_backed_assets/definitions.py b/examples/feature_graph_backed_assets/feature_graph_backed_assets/definitions.py index a55de7f79c23c..f71c4228d992b 100644 --- a/examples/feature_graph_backed_assets/feature_graph_backed_assets/definitions.py +++ b/examples/feature_graph_backed_assets/feature_graph_backed_assets/definitions.py @@ -9,11 +9,6 @@ from . import assets from .graphs_and_ops import layover_breakdown_2022, us_assets -airline_job = define_asset_job( - "airline_job", AssetSelection.assets("passenger_flights").downstream() -) - - defs = Definitions( assets=[ *load_assets_from_package_module(assets),