diff --git a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx index 8f7737f24b839..c22a3cba5cdea 100644 --- a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx +++ b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx @@ -99,12 +99,12 @@ def events(context: AssetExecutionContext) -> None: ```python file=/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py startafter=start_marker endbefore=end_marker from dagster import ( AssetExecutionContext, + AssetSpec, BackfillPolicy, DailyPartitionsDefinition, InputContext, IOManager, OutputContext, - SourceAsset, asset, ) @@ -121,7 +121,7 @@ class MyIOManager(IOManager): daily_partition = DailyPartitionsDefinition(start_date="2020-01-01") -raw_events = SourceAsset("raw_events", partitions_def=daily_partition) +raw_events = AssetSpec("raw_events", partitions_def=daily_partition) @asset( diff --git a/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx b/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx index c263f3b37e554..143efb7f4e75e 100644 --- a/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx +++ b/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx @@ -219,11 +219,11 @@ Here's what an equivalent job looks like using asset definitions: ```python file=/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py from pandas import DataFrame -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import AssetSpec, Definitions, asset, define_asset_job from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model -raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse") +raw_users = AssetSpec(key="raw_users", metadata={"dagster/io_manager_key": "warehouse"}) @asset(io_manager_key="warehouse") @@ -301,14 +301,12 @@ Here's an example of an equivalent job that uses asset definitions: ```python file=/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py from pandas import read_sql -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import Definitions, asset, define_asset_job from .mylib import create_db_connection, pickle_to_s3, train_recommender_model -raw_users = SourceAsset(key="raw_users") - -@asset(deps=[raw_users]) +@asset(deps=["raw_users"]) def users() -> None: raw_users_df = read_sql("select * from raw_users", con=create_db_connection()) users_df = raw_users_df.dropna() diff --git a/docs/content/guides/dagster/software-defined-assets.mdx b/docs/content/guides/dagster/software-defined-assets.mdx index 151f0d0f8e43d..8d2ffcbbdabbf 100644 --- a/docs/content/guides/dagster/software-defined-assets.mdx +++ b/docs/content/guides/dagster/software-defined-assets.mdx @@ -29,10 +29,10 @@ Here are our asset definitions that define tables we want to materialize. ```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py startafter=start_marker endbefore=end_marker import pandas as pd -from dagster import AssetKey, SourceAsset, asset +from dagster import AssetKey, AssetSpec, Definitions, asset from pandas import DataFrame -sfo_q2_weather_sample = SourceAsset( +sfo_q2_weather_sample = AssetSpec( key=AssetKey("sfo_q2_weather_sample"), description="Weather samples, taken every five minutes at SFO", metadata={"format": "csv"}, @@ -50,9 +50,12 @@ def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame: def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame: """Computes the 10 hottest dates.""" return daily_temperature_highs.nlargest(10, "max_tmpf") + + +defs = Definitions(assets=[sfo_q2_weather_sample, daily_temperature_highs, hottest_dates]) ``` -`sfo_q2_weather_sample` represents our base temperature table. It's a , meaning that we rely on it, but don't generate it. +`sfo_q2_weather_sample` represents our base temperature table. It's a table that's used by the assets in our pipeline, but that we're not responsible for generating. We define it using an , which allows us to specify its attributes without providing a function that materializes it. `daily_temperature_highs` represents a computed asset. It's derived by taking the `sfo_q2_weather_sample` table and applying the decorated function to it. Notice that it's defined using a pure function, a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on in an . This allows us to swap in different implementations in different environments. For example, in local development, we might want to store data in a local CSV file for easy testing. However in production, we would want to store data in a data warehouse. diff --git a/docs/content/integrations/embedded-elt/dlt.mdx b/docs/content/integrations/embedded-elt/dlt.mdx index e320b777df0f7..eb90a2779c010 100644 --- a/docs/content/integrations/embedded-elt/dlt.mdx +++ b/docs/content/integrations/embedded-elt/dlt.mdx @@ -283,19 +283,19 @@ def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource): In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets. -### Assigning metadata to upstream source assets +### Assigning metadata to upstream external assets -A common question is how to define metadata on the source assets upstream of the dlt assets. +A common question is how to define metadata on the external assets upstream of the dlt assets. -This can be accomplished by defining a with a key that matches the one defined in the method. +This can be accomplished by defining a with a key that matches the one defined in the method. -For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a with attributes like `group_name`. +For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a with attributes like `group_name`. ```python file=/integrations/embedded_elt/dlt_source_assets.py import dlt from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets -from dagster import AssetExecutionContext, SourceAsset +from dagster import AssetExecutionContext, AssetSpec @dlt.source @@ -319,8 +319,7 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource): thinkific_source_assets = [ - SourceAsset(key, group_name="thinkific") - for key in example_dlt_assets.dependency_keys + AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys ] ``` diff --git a/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py b/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py index 9657069aabacc..c58d4d347ea10 100644 --- a/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py +++ b/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py @@ -1,9 +1,9 @@ # start_marker import pandas as pd -from dagster import AssetKey, SourceAsset, asset +from dagster import AssetKey, AssetSpec, Definitions, asset from pandas import DataFrame -sfo_q2_weather_sample = SourceAsset( +sfo_q2_weather_sample = AssetSpec( key=AssetKey("sfo_q2_weather_sample"), description="Weather samples, taken every five minutes at SFO", metadata={"format": "csv"}, @@ -23,4 +23,6 @@ def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame: return daily_temperature_highs.nlargest(10, "max_tmpf") +defs = Definitions(assets=[sfo_q2_weather_sample, daily_temperature_highs, hottest_dates]) + # end_marker diff --git a/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_assets.py b/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_assets.py index d2c5fbfbb1eec..c64395fe8fc25 100644 --- a/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_assets.py +++ b/examples/assets_pandas_pyspark/assets_pandas_pyspark_tests/test_assets.py @@ -1,4 +1,4 @@ -from dagster import load_assets_from_modules, materialize +from dagster import materialize from dagster._core.test_utils import instance_for_test from assets_pandas_pyspark.assets import spark_asset, table_assets @@ -9,7 +9,7 @@ def test_weather_assets(): with instance_for_test() as instance: assert materialize( - load_assets_from_modules([table_assets]), + table_assets.defs.assets, instance=instance, resources={"io_manager": LocalFileSystemIOManager()}, ).success @@ -20,7 +20,7 @@ def test_spark_weather_assets(): with instance_for_test() as instance: assert materialize( - load_assets_from_modules([table_assets, spark_asset]), + [*table_assets.defs.assets, spark_asset.daily_temperature_high_diffs], instance=instance, resources={"io_manager": LocalFileSystemIOManager()}, ).success diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py index ce5b4eb313b73..a3ea69d00f26e 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py @@ -1,12 +1,12 @@ # start_marker from dagster import ( AssetExecutionContext, + AssetSpec, BackfillPolicy, DailyPartitionsDefinition, InputContext, IOManager, OutputContext, - SourceAsset, asset, ) @@ -23,7 +23,7 @@ def handle_output(self, context: OutputContext, obj): daily_partition = DailyPartitionsDefinition(start_date="2020-01-01") -raw_events = SourceAsset("raw_events", partitions_def=daily_partition) +raw_events = AssetSpec("raw_events", partitions_def=daily_partition) @asset( diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py index 4527de70a3e90..b28def69310c6 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py @@ -1,10 +1,10 @@ from pandas import DataFrame -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import AssetSpec, Definitions, asset, define_asset_job from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model -raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse") +raw_users = AssetSpec(key="raw_users", metadata={"dagster/io_manager_key": "warehouse"}) @asset(io_manager_key="warehouse") diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py index 9b4db4d8f8f5f..cdf0edbea5da4 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py @@ -1,13 +1,11 @@ from pandas import read_sql -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import Definitions, asset, define_asset_job from .mylib import create_db_connection, pickle_to_s3, train_recommender_model -raw_users = SourceAsset(key="raw_users") - -@asset(deps=[raw_users]) +@asset(deps=["raw_users"]) def users() -> None: raw_users_df = read_sql("select * from raw_users", con=create_db_connection()) users_df = raw_users_df.dropna() diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py index 35c35d776f52f..691b81f79e8f5 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py @@ -1,7 +1,7 @@ import dlt from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets -from dagster import AssetExecutionContext, SourceAsset +from dagster import AssetExecutionContext, AssetSpec @dlt.source @@ -25,6 +25,5 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource): thinkific_source_assets = [ - SourceAsset(key, group_name="thinkific") - for key in example_dlt_assets.dependency_keys + AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys ]