From 4cce155d1714e4a31ba7266a1b4630081974f163 Mon Sep 17 00:00:00 2001 From: Erin Cochran Date: Mon, 5 Aug 2024 17:51:42 -0400 Subject: [PATCH 1/2] Update examples --- .../assets_pandas_pyspark/assets/table_assets.py | 4 ++-- .../backfills/single_run_backfill_io_manager.py | 4 ++-- .../sda_io_manager.py | 4 ++-- .../enriching_with_software_defined_assets/sda_nothing.py | 4 ++-- .../integrations/bigquery/reference/dataset.py | 6 +++--- .../bigquery/tutorial/io_manager/full_example.py | 4 ++-- .../bigquery/tutorial/io_manager/source_asset.py | 4 ++-- .../integrations/bigquery/tutorial/resource/full_example.py | 4 ++-- .../integrations/bigquery/tutorial/resource/source_asset.py | 4 ++-- .../docs_snippets/integrations/deltalake/full_example.py | 4 ++-- .../docs_snippets/integrations/deltalake/schema.py | 4 ++-- .../docs_snippets/integrations/deltalake/source_asset.py | 4 ++-- .../docs_snippets/integrations/duckdb/reference/schema.py | 6 +++--- .../integrations/duckdb/tutorial/io_manager/full_example.py | 4 ++-- .../integrations/duckdb/tutorial/io_manager/source_asset.py | 4 ++-- .../integrations/duckdb/tutorial/resource/full_example.py | 4 ++-- .../integrations/embedded_elt/dlt_source_assets.py | 5 ++--- .../snowflake/io_manager_tutorial/full_example.py | 4 ++-- .../docs_snippets/integrations/snowflake/schema.py | 6 +++--- .../docs_snippets/integrations/snowflake/source_asset.py | 4 ++-- 20 files changed, 43 insertions(+), 44 deletions(-) diff --git a/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py b/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py index 9657069aabacc..d72b5b59efe9e 100644 --- a/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py +++ b/examples/assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py @@ -1,9 +1,9 @@ # start_marker import pandas as pd -from dagster import AssetKey, SourceAsset, asset +from dagster import AssetKey, AssetSpec, asset from pandas import DataFrame -sfo_q2_weather_sample = SourceAsset( +sfo_q2_weather_sample = AssetSpec( key=AssetKey("sfo_q2_weather_sample"), description="Weather samples, taken every five minutes at SFO", metadata={"format": "csv"}, diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py index ce5b4eb313b73..a3ea69d00f26e 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py @@ -1,12 +1,12 @@ # start_marker from dagster import ( AssetExecutionContext, + AssetSpec, BackfillPolicy, DailyPartitionsDefinition, InputContext, IOManager, OutputContext, - SourceAsset, asset, ) @@ -23,7 +23,7 @@ def handle_output(self, context: OutputContext, obj): daily_partition = DailyPartitionsDefinition(start_date="2020-01-01") -raw_events = SourceAsset("raw_events", partitions_def=daily_partition) +raw_events = AssetSpec("raw_events", partitions_def=daily_partition) @asset( diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py index 4527de70a3e90..7c09bd3782865 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py @@ -1,10 +1,10 @@ from pandas import DataFrame -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import AssetSpec, Definitions, asset, define_asset_job from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model -raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse") +raw_users = AssetSpec(key="raw_users", io_manager_key="warehouse") @asset(io_manager_key="warehouse") diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py index 9b4db4d8f8f5f..d5140c09b25fb 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py @@ -1,10 +1,10 @@ from pandas import read_sql -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import AssetSpec, Definitions, asset, define_asset_job from .mylib import create_db_connection, pickle_to_s3, train_recommender_model -raw_users = SourceAsset(key="raw_users") +raw_users = AssetSpec(key="raw_users") @asset(deps=[raw_users]) diff --git a/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/dataset.py b/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/dataset.py index 004093e3d0bd4..e711889b0b8a3 100644 --- a/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/dataset.py +++ b/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/dataset.py @@ -1,11 +1,11 @@ import pandas as pd -from dagster import SourceAsset, asset +from dagster import AssetSpec, asset def scppe_asset_key(): # start_asset_key - daffodil_data = SourceAsset(key=["gcp", "bigquery", "daffodil", "daffodil_data"]) + daffodil_data = AssetSpec(key=["gcp", "bigquery", "daffodil", "daffodil_data"]) @asset(key_prefix=["gcp", "bigquery", "iris"]) def iris_data() -> pd.DataFrame: @@ -25,7 +25,7 @@ def iris_data() -> pd.DataFrame: def scope_metadata(): # start_metadata - daffodil_data = SourceAsset(key=["daffodil_data"], metadata={"schema": "daffodil"}) + daffodil_data = AssetSpec(key=["daffodil_data"], metadata={"schema": "daffodil"}) @asset(metadata={"schema": "iris"}) def iris_data() -> pd.DataFrame: diff --git a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/full_example.py b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/full_example.py index 19816f23a0505..d126c356797af 100644 --- a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/full_example.py +++ b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/full_example.py @@ -1,9 +1,9 @@ import pandas as pd from dagster_gcp_pandas import BigQueryPandasIOManager -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/source_asset.py b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/source_asset.py index 3dc8e76299380..b23636f2094f0 100644 --- a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/source_asset.py +++ b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/io_manager/source_asset.py @@ -1,3 +1,3 @@ -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") diff --git a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/full_example.py b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/full_example.py index 47bd2229e53af..aa6df2b6f9af2 100644 --- a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/full_example.py +++ b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/full_example.py @@ -2,9 +2,9 @@ from dagster_gcp import BigQueryResource from google.cloud import bigquery as bq -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/source_asset.py b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/source_asset.py index 3dc8e76299380..b23636f2094f0 100644 --- a/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/source_asset.py +++ b/examples/docs_snippets/docs_snippets/integrations/bigquery/tutorial/resource/source_asset.py @@ -1,3 +1,3 @@ -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") diff --git a/examples/docs_snippets/docs_snippets/integrations/deltalake/full_example.py b/examples/docs_snippets/docs_snippets/integrations/deltalake/full_example.py index 569fb2fd1637b..3ae81fa12a24b 100644 --- a/examples/docs_snippets/docs_snippets/integrations/deltalake/full_example.py +++ b/examples/docs_snippets/docs_snippets/integrations/deltalake/full_example.py @@ -2,9 +2,9 @@ from dagster_deltalake import LocalConfig from dagster_deltalake_pandas import DeltaLakePandasIOManager -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/examples/docs_snippets/docs_snippets/integrations/deltalake/schema.py b/examples/docs_snippets/docs_snippets/integrations/deltalake/schema.py index 19fae3169b7b3..754c65e822936 100644 --- a/examples/docs_snippets/docs_snippets/integrations/deltalake/schema.py +++ b/examples/docs_snippets/docs_snippets/integrations/deltalake/schema.py @@ -2,9 +2,9 @@ import pandas as pd -from dagster import SourceAsset, asset +from dagster import AssetSpec, asset -daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"]) +daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"]) @asset(key_prefix=["iris"]) diff --git a/examples/docs_snippets/docs_snippets/integrations/deltalake/source_asset.py b/examples/docs_snippets/docs_snippets/integrations/deltalake/source_asset.py index 3dc8e76299380..b23636f2094f0 100644 --- a/examples/docs_snippets/docs_snippets/integrations/deltalake/source_asset.py +++ b/examples/docs_snippets/docs_snippets/integrations/deltalake/source_asset.py @@ -1,3 +1,3 @@ -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") diff --git a/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/schema.py b/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/schema.py index ee04c7933d061..9296c37a34c4c 100644 --- a/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/schema.py +++ b/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/schema.py @@ -1,11 +1,11 @@ import pandas as pd -from dagster import SourceAsset, asset +from dagster import AssetSpec, asset def scope_asset_key(): # start_asset_key - daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"]) + daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"]) @asset(key_prefix=["iris"]) def iris_dataset() -> pd.DataFrame: @@ -26,7 +26,7 @@ def iris_dataset() -> pd.DataFrame: def scope_metadata(): # start_metadata - daffodil_dataset = SourceAsset( + daffodil_dataset = AssetSpec( key=["daffodil_dataset"], metadata={"schema": "daffodil"} ) diff --git a/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/full_example.py b/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/full_example.py index 74fee7b83ebea..7d01539980c6b 100644 --- a/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/full_example.py +++ b/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/full_example.py @@ -1,9 +1,9 @@ import pandas as pd from dagster_duckdb_pandas import DuckDBPandasIOManager -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/source_asset.py b/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/source_asset.py index 3dc8e76299380..b23636f2094f0 100644 --- a/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/source_asset.py +++ b/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/io_manager/source_asset.py @@ -1,3 +1,3 @@ -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") diff --git a/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/resource/full_example.py b/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/resource/full_example.py index ba557a6e393ec..9d274e9f88541 100644 --- a/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/resource/full_example.py +++ b/examples/docs_snippets/docs_snippets/integrations/duckdb/tutorial/resource/full_example.py @@ -1,9 +1,9 @@ import pandas as pd from dagster_duckdb import DuckDBResource -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py index 35c35d776f52f..691b81f79e8f5 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_source_assets.py @@ -1,7 +1,7 @@ import dlt from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets -from dagster import AssetExecutionContext, SourceAsset +from dagster import AssetExecutionContext, AssetSpec @dlt.source @@ -25,6 +25,5 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource): thinkific_source_assets = [ - SourceAsset(key, group_name="thinkific") - for key in example_dlt_assets.dependency_keys + AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys ] diff --git a/examples/docs_snippets/docs_snippets/integrations/snowflake/io_manager_tutorial/full_example.py b/examples/docs_snippets/docs_snippets/integrations/snowflake/io_manager_tutorial/full_example.py index 83466c2056f16..694d78f9f0245 100644 --- a/examples/docs_snippets/docs_snippets/integrations/snowflake/io_manager_tutorial/full_example.py +++ b/examples/docs_snippets/docs_snippets/integrations/snowflake/io_manager_tutorial/full_example.py @@ -1,9 +1,9 @@ import pandas as pd from dagster_snowflake_pandas import SnowflakePandasIOManager -from dagster import Definitions, EnvVar, SourceAsset, asset +from dagster import AssetSpec, Definitions, EnvVar, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/examples/docs_snippets/docs_snippets/integrations/snowflake/schema.py b/examples/docs_snippets/docs_snippets/integrations/snowflake/schema.py index fd507cc2fd453..4763a83a41272 100644 --- a/examples/docs_snippets/docs_snippets/integrations/snowflake/schema.py +++ b/examples/docs_snippets/docs_snippets/integrations/snowflake/schema.py @@ -1,11 +1,11 @@ import pandas as pd -from dagster import SourceAsset, asset +from dagster import AssetSpec, asset def scope_asset_key(): # start_asset_key - daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"]) + daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"]) @asset(key_prefix=["iris"]) def iris_dataset() -> pd.DataFrame: @@ -25,7 +25,7 @@ def iris_dataset() -> pd.DataFrame: def scope_metadata(): # start_metadata - daffodil_dataset = SourceAsset( + daffodil_dataset = AssetSpec( key=["daffodil_dataset"], metadata={"schema": "daffodil"} ) diff --git a/examples/docs_snippets/docs_snippets/integrations/snowflake/source_asset.py b/examples/docs_snippets/docs_snippets/integrations/snowflake/source_asset.py index 3dc8e76299380..b23636f2094f0 100644 --- a/examples/docs_snippets/docs_snippets/integrations/snowflake/source_asset.py +++ b/examples/docs_snippets/docs_snippets/integrations/snowflake/source_asset.py @@ -1,3 +1,3 @@ -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") From cd6ca14d76f69f9e1aaae36a144a53f37f62890a Mon Sep 17 00:00:00 2001 From: Erin Cochran Date: Mon, 5 Aug 2024 17:51:50 -0400 Subject: [PATCH 2/2] Update docs --- .../backfills.mdx | 4 ++-- .../dagster/asset-versioning-and-caching.mdx | 2 +- ...enriching-with-software-defined-assets.mdx | 8 +++---- .../dagster/software-defined-assets.mdx | 6 ++--- .../integrations/bigquery/reference.mdx | 4 ++-- .../bigquery/using-bigquery-with-dagster.mdx | 22 +++++++++---------- .../integrations/deltalake/reference.mdx | 6 ++--- .../using-deltalake-with-dagster.mdx | 12 +++++----- .../content/integrations/duckdb/reference.mdx | 6 ++--- .../duckdb/using-duckdb-with-dagster.mdx | 22 +++++++++---------- .../content/integrations/embedded-elt/dlt.mdx | 9 ++++---- .../integrations/snowflake/reference.mdx | 4 ++-- ...ing-snowflake-with-dagster-io-managers.mdx | 12 +++++----- .../using-snowflake-with-dagster.mdx | 8 +++---- 14 files changed, 62 insertions(+), 63 deletions(-) diff --git a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx index 8f7737f24b839..c22a3cba5cdea 100644 --- a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx +++ b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx @@ -99,12 +99,12 @@ def events(context: AssetExecutionContext) -> None: ```python file=/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py startafter=start_marker endbefore=end_marker from dagster import ( AssetExecutionContext, + AssetSpec, BackfillPolicy, DailyPartitionsDefinition, InputContext, IOManager, OutputContext, - SourceAsset, asset, ) @@ -121,7 +121,7 @@ class MyIOManager(IOManager): daily_partition = DailyPartitionsDefinition(start_date="2020-01-01") -raw_events = SourceAsset("raw_events", partitions_def=daily_partition) +raw_events = AssetSpec("raw_events", partitions_def=daily_partition) @asset( diff --git a/docs/content/guides/dagster/asset-versioning-and-caching.mdx b/docs/content/guides/dagster/asset-versioning-and-caching.mdx index 8c740d59db832..6e530f4f9be09 100644 --- a/docs/content/guides/dagster/asset-versioning-and-caching.mdx +++ b/docs/content/guides/dagster/asset-versioning-and-caching.mdx @@ -257,7 +257,7 @@ If `versioned_number` had used a Dagster-generated data version, the data versio In the real world, data pipelines depend on external upstream data. So far in this guide, we haven't used any external data; we've been substituting hardcoded data in the asset at the root of our graph and using a code version as a stand-in for the version of that data. We can do better than this. -External data sources in Dagster are modeled by . We can add versioning to a `SourceAsset` by making it observable. An observable source asset has a user-defined function that computes and returns a data version. +External data sources in Dagster are modeled by . We can add versioning to a `SourceAsset` by making it observable. An observable source asset has a user-defined function that computes and returns a data version. Let's add an called `input_number`. This will represent a file written by an external process upstream of our pipeline: diff --git a/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx b/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx index c263f3b37e554..a732ba390e321 100644 --- a/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx +++ b/docs/content/guides/dagster/enriching-with-software-defined-assets.mdx @@ -219,11 +219,11 @@ Here's what an equivalent job looks like using asset definitions: ```python file=/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py from pandas import DataFrame -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import AssetSpec, Definitions, asset, define_asset_job from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model -raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse") +raw_users = AssetSpec(key="raw_users", io_manager_key="warehouse") @asset(io_manager_key="warehouse") @@ -301,11 +301,11 @@ Here's an example of an equivalent job that uses asset definitions: ```python file=/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py from pandas import read_sql -from dagster import Definitions, SourceAsset, asset, define_asset_job +from dagster import AssetSpec, Definitions, asset, define_asset_job from .mylib import create_db_connection, pickle_to_s3, train_recommender_model -raw_users = SourceAsset(key="raw_users") +raw_users = AssetSpec(key="raw_users") @asset(deps=[raw_users]) diff --git a/docs/content/guides/dagster/software-defined-assets.mdx b/docs/content/guides/dagster/software-defined-assets.mdx index bd1ba81644718..62f1975af1a87 100644 --- a/docs/content/guides/dagster/software-defined-assets.mdx +++ b/docs/content/guides/dagster/software-defined-assets.mdx @@ -29,10 +29,10 @@ Here are our asset definitions that define tables we want to materialize. ```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py startafter=start_marker endbefore=end_marker import pandas as pd -from dagster import AssetKey, SourceAsset, asset +from dagster import AssetKey, AssetSpec, asset from pandas import DataFrame -sfo_q2_weather_sample = SourceAsset( +sfo_q2_weather_sample = AssetSpec( key=AssetKey("sfo_q2_weather_sample"), description="Weather samples, taken every five minutes at SFO", metadata={"format": "csv"}, @@ -52,7 +52,7 @@ def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame: return daily_temperature_highs.nlargest(10, "max_tmpf") ``` -`sfo_q2_weather_sample` represents our base temperature table. It's a , meaning that we rely on it, but don't generate it. +`sfo_q2_weather_sample` represents our base temperature table. It's an , meaning that we rely on it, but don't generate it. `daily_temperature_highs` represents a computed asset. It's derived by taking the `sfo_q2_weather_sample` table and applying the decorated function to it. Notice that it's defined using a pure function, a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on in an . This allows us to swap in different implementations in different environments. For example, in local development, we might want to store data in a local CSV file for easy testing. However in production, we would want to store data in a data warehouse. diff --git a/docs/content/integrations/bigquery/reference.mdx b/docs/content/integrations/bigquery/reference.mdx index f1b404c7f2a9f..89613838ff067 100644 --- a/docs/content/integrations/bigquery/reference.mdx +++ b/docs/content/integrations/bigquery/reference.mdx @@ -273,7 +273,7 @@ You can specify the default dataset where data will be stored as configuration t If you want to store assets in different datasets, you can specify the dataset as metadata: ```python file=/integrations/bigquery/reference/dataset.py startafter=start_metadata endbefore=end_metadata dedent=4 -daffodil_data = SourceAsset(key=["daffodil_data"], metadata={"schema": "daffodil"}) +daffodil_data = AssetSpec(key=["daffodil_data"], metadata={"schema": "daffodil"}) @asset(metadata={"schema": "iris"}) def iris_data() -> pd.DataFrame: @@ -292,7 +292,7 @@ def iris_data() -> pd.DataFrame: You can also specify the dataset as part of the asset's asset key: ```python file=/integrations/bigquery/reference/dataset.py startafter=start_asset_key endbefore=end_asset_key dedent=4 -daffodil_data = SourceAsset(key=["gcp", "bigquery", "daffodil", "daffodil_data"]) +daffodil_data = AssetSpec(key=["gcp", "bigquery", "daffodil", "daffodil_data"]) @asset(key_prefix=["gcp", "bigquery", "iris"]) def iris_data() -> pd.DataFrame: diff --git a/docs/content/integrations/bigquery/using-bigquery-with-dagster.mdx b/docs/content/integrations/bigquery/using-bigquery-with-dagster.mdx index de68af18c9301..cd76769a00602 100644 --- a/docs/content/integrations/bigquery/using-bigquery-with-dagster.mdx +++ b/docs/content/integrations/bigquery/using-bigquery-with-dagster.mdx @@ -132,12 +132,12 @@ Now you can run `dagster dev` and materialize the `iris_data` asset from the Dag If you already have existing tables in BigQuery and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by creating [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. ```python file=/integrations/bigquery/tutorial/resource/source_asset.py -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") ``` -In this example, you're creating a for a pre-existing table called `iris_harvest_data`. +In this example, you're creating an for a pre-existing table called `iris_harvest_data`. @@ -176,9 +176,9 @@ import pandas as pd from dagster_gcp import BigQueryResource from google.cloud import bigquery as bq -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset @@ -314,14 +314,14 @@ When Dagster materializes the `iris_data` asset using the configuration from [St If you already have existing tables in BigQuery and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can create [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. When using an I/O manager, creating a source asset for an existing table also allows you to tell Dagster how to find the table so it can be fetched for downstream assets. ```python file=/integrations/bigquery/tutorial/io_manager/source_asset.py -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") ``` -In this example, you're creating a for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, you need to tell the BigQuery I/O manager how to find the data, so that the I/O manager can load the data into memory. +In this example, you're creating an for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, you need to tell the BigQuery I/O manager how to find the data, so that the I/O manager can load the data into memory. -Because you already supplied the project and dataset in the I/O manager configuration in [Step 1: Configure the BigQuery I/O manager](#step-1-configure-the-bigquery-io-manager), you only need to provide the table name. This is done with the `key` parameter in `SourceAsset`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset. +Because you already supplied the project and dataset in the I/O manager configuration in [Step 1: Configure the BigQuery I/O manager](#step-1-configure-the-bigquery-io-manager), you only need to provide the table name. This is done with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset. @@ -355,9 +355,9 @@ When finished, your code should look like the following: import pandas as pd from dagster_gcp_pandas import BigQueryPandasIOManager -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/docs/content/integrations/deltalake/reference.mdx b/docs/content/integrations/deltalake/reference.mdx index e706b118413f5..353257312bdac 100644 --- a/docs/content/integrations/deltalake/reference.mdx +++ b/docs/content/integrations/deltalake/reference.mdx @@ -233,15 +233,15 @@ If you want all of your assets to be stored in the same schema, you can specify If you want to store assets in different schemas, you can specify the schema as part of the asset's asset key: -- **For `SourceAsset`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`. +- **For `AssetSpec`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`. - **For asset definitions**, use the `key_prefix` parameter. This value will be prepended to the asset name to create the full asset key. In the following example, this would be `iris`. ```python file=/integrations/deltalake/schema.py startafter=start_asset_key endbefore=end_asset_key import pandas as pd -from dagster import SourceAsset, asset +from dagster import AssetSpec, asset -daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"]) +daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"]) @asset(key_prefix=["iris"]) diff --git a/docs/content/integrations/deltalake/using-deltalake-with-dagster.mdx b/docs/content/integrations/deltalake/using-deltalake-with-dagster.mdx index 40590e0e21dc0..37af75712fbe6 100644 --- a/docs/content/integrations/deltalake/using-deltalake-with-dagster.mdx +++ b/docs/content/integrations/deltalake/using-deltalake-with-dagster.mdx @@ -101,14 +101,14 @@ When Dagster materializes the `iris_dataset` asset using the configuration from If you already have tables in your Delta Lake, you may want to make them available to other Dagster assets. You can accomplish this by using [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. By creating a source asset for the existing table, you tell Dagster how to find the table so it can be fetched for downstream assets. ```python file=/integrations/deltalake/source_asset.py -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") ``` -In this example, we create a for an existing table containing iris harvest data. To make the data available to other Dagster assets, we need to tell the Delta Lake I/O manager how to find the data. +In this example, we create an for an existing table containing iris harvest data. To make the data available to other Dagster assets, we need to tell the Delta Lake I/O manager how to find the data. -Because we already supplied the database and schema in the I/O manager configuration in [Step 1](#step-1-configure-the-delta-lake-io-manager), we only need to provide the table name. We do this with the `key` parameter in `SourceAsset`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `iris/iris_harvest_data` folder as a Pandas DataFrame and provide it to the downstream asset. +Because we already supplied the database and schema in the I/O manager configuration in [Step 1](#step-1-configure-the-delta-lake-io-manager), we only need to provide the table name. We do this with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `iris/iris_harvest_data` folder as a Pandas DataFrame and provide it to the downstream asset. @@ -149,9 +149,9 @@ import pandas as pd from dagster_deltalake import LocalConfig from dagster_deltalake_pandas import DeltaLakePandasIOManager -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/docs/content/integrations/duckdb/reference.mdx b/docs/content/integrations/duckdb/reference.mdx index 0576da9697c7a..ef36c446fa293 100644 --- a/docs/content/integrations/duckdb/reference.mdx +++ b/docs/content/integrations/duckdb/reference.mdx @@ -274,7 +274,7 @@ You can specify the default schema where data will be stored as configuration to If you want to store assets in different schemas, you can specify the schema as metadata: ```python file=/integrations/duckdb/reference/schema.py startafter=start_metadata endbefore=end_metadata dedent=4 -daffodil_dataset = SourceAsset( +daffodil_dataset = AssetSpec( key=["daffodil_dataset"], metadata={"schema": "daffodil"} ) @@ -294,11 +294,11 @@ def iris_dataset() -> pd.DataFrame: You can also specify the schema as part of the asset's asset key: -- **For `SourceAsset`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`. +- **For `AssetSpec`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`. - **For asset definitions**, use the `key_prefix` parameter. This value will be prepended to the asset name to create the full asset key. In the following example, this would be `iris`. ```python file=/integrations/duckdb/reference/schema.py startafter=start_asset_key endbefore=end_asset_key dedent=4 -daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"]) +daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"]) @asset(key_prefix=["iris"]) def iris_dataset() -> pd.DataFrame: diff --git a/docs/content/integrations/duckdb/using-duckdb-with-dagster.mdx b/docs/content/integrations/duckdb/using-duckdb-with-dagster.mdx index 6346d48e5aeba..b42fd1327e39f 100644 --- a/docs/content/integrations/duckdb/using-duckdb-with-dagster.mdx +++ b/docs/content/integrations/duckdb/using-duckdb-with-dagster.mdx @@ -114,12 +114,12 @@ In this example, you're defining an asset that fetches the Iris dataset as a Pan If you already have existing tables in DuckDB and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by creating [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. ```python file=/integrations/duckdb/tutorial/io_manager/source_asset.py -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") ``` -In this example, you're creating a for a pre-existing table called `iris_harvest_data`. +In this example, you're creating an for a pre-existing table called `iris_harvest_data`. @@ -156,9 +156,9 @@ When finished, your code should look like the following: import pandas as pd from dagster_duckdb import DuckDBResource -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset @@ -275,14 +275,14 @@ When Dagster materializes the `iris_dataset` asset using the configuration from If you already have existing tables in DuckDB and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by creating [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. ```python file=/integrations/duckdb/tutorial/io_manager/source_asset.py -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") ``` -In this example, you're creating a for a pre-existing table containing iris harvests data. To make the data available to other Dagster assets, you need to tell the DuckDB I/O manager how to find the data. +In this example, you're creating an for a pre-existing table containing iris harvests data. To make the data available to other Dagster assets, you need to tell the DuckDB I/O manager how to find the data. -Because you already supplied the database and schema in the I/O manager configuration in [Step 1: Configure the DuckDB I/O manager](#step-1-configure-the-duckdb-io-manager), you only need to provide the table name. This is done with the `key` parameter in `SourceAsset`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset. +Because you already supplied the database and schema in the I/O manager configuration in [Step 1: Configure the DuckDB I/O manager](#step-1-configure-the-duckdb-io-manager), you only need to provide the table name. This is done with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset. @@ -314,9 +314,9 @@ When finished, your code should look like the following: import pandas as pd from dagster_duckdb_pandas import DuckDBPandasIOManager -from dagster import Definitions, SourceAsset, asset +from dagster import AssetSpec, Definitions, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/docs/content/integrations/embedded-elt/dlt.mdx b/docs/content/integrations/embedded-elt/dlt.mdx index e320b777df0f7..300eb617e69af 100644 --- a/docs/content/integrations/embedded-elt/dlt.mdx +++ b/docs/content/integrations/embedded-elt/dlt.mdx @@ -287,15 +287,15 @@ In this example, we customized the translator to change how the dlt assets' name A common question is how to define metadata on the source assets upstream of the dlt assets. -This can be accomplished by defining a with a key that matches the one defined in the method. +This can be accomplished by defining an with a key that matches the one defined in the method. -For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a with attributes like `group_name`. +For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive an with attributes like `group_name`. ```python file=/integrations/embedded_elt/dlt_source_assets.py import dlt from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets -from dagster import AssetExecutionContext, SourceAsset +from dagster import AssetExecutionContext, AssetSpec @dlt.source @@ -319,8 +319,7 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource): thinkific_source_assets = [ - SourceAsset(key, group_name="thinkific") - for key in example_dlt_assets.dependency_keys + AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys ] ``` diff --git a/docs/content/integrations/snowflake/reference.mdx b/docs/content/integrations/snowflake/reference.mdx index a48b7fa1f62be..aa86e02bff938 100644 --- a/docs/content/integrations/snowflake/reference.mdx +++ b/docs/content/integrations/snowflake/reference.mdx @@ -386,7 +386,7 @@ You can specify the default schema where data will be stored as configuration to To store assets in different schemas, specify the schema as metadata: ```python file=/integrations/snowflake/schema.py startafter=start_metadata endbefore=end_metadata dedent=4 -daffodil_dataset = SourceAsset( +daffodil_dataset = AssetSpec( key=["daffodil_dataset"], metadata={"schema": "daffodil"} ) @@ -407,7 +407,7 @@ def iris_dataset() -> pd.DataFrame: You can also specify the schema as part of the asset's asset key: ```python file=/integrations/snowflake/schema.py startafter=start_asset_key endbefore=end_asset_key dedent=4 -daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"]) +daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"]) @asset(key_prefix=["iris"]) def iris_dataset() -> pd.DataFrame: diff --git a/docs/content/integrations/snowflake/using-snowflake-with-dagster-io-managers.mdx b/docs/content/integrations/snowflake/using-snowflake-with-dagster-io-managers.mdx index caac2267e84a0..ceab5d95c5a15 100644 --- a/docs/content/integrations/snowflake/using-snowflake-with-dagster-io-managers.mdx +++ b/docs/content/integrations/snowflake/using-snowflake-with-dagster-io-managers.mdx @@ -135,14 +135,14 @@ When Dagster materializes the `iris_dataset` asset using the configuration from You may already have tables in Snowflake that you want to make available to other Dagster assets. You can create [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. By creating a source asset for the existing table, you tell Dagster how to find the table so it can be fetched for downstream assets. ```python file=/integrations/snowflake/source_asset.py -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") ``` -In this example, we create a for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, we need to tell the Snowflake I/O manager how to find the data. +In this example, we create an for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, we need to tell the Snowflake I/O manager how to find the data. -Since we supply the database and the schema in the I/O manager configuration in [Step 1: Configure the Snowflake I/O manager](#step-1-configure-the-snowflake-io-manager), we only need to provide the table name. We do this with the `key` parameter in `SourceAsset`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `FLOWERS.IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset. +Since we supply the database and the schema in the I/O manager configuration in [Step 1: Configure the Snowflake I/O manager](#step-1-configure-the-snowflake-io-manager), we only need to provide the table name. We do this with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `FLOWERS.IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset. @@ -180,9 +180,9 @@ When finished, your code should look like the following: import pandas as pd from dagster_snowflake_pandas import SnowflakePandasIOManager -from dagster import Definitions, EnvVar, SourceAsset, asset +from dagster import AssetSpec, Definitions, EnvVar, asset -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") @asset diff --git a/docs/content/integrations/snowflake/using-snowflake-with-dagster.mdx b/docs/content/integrations/snowflake/using-snowflake-with-dagster.mdx index 5552f7973783f..fa1730a2badbe 100644 --- a/docs/content/integrations/snowflake/using-snowflake-with-dagster.mdx +++ b/docs/content/integrations/snowflake/using-snowflake-with-dagster.mdx @@ -142,14 +142,14 @@ If you have existing tables in Snowflake and other assets defined in Dagster dep Making Dagster aware of these tables allows you to track the full data lineage in Dagster. You can accomplish this by creating [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. For example: ```python file=/integrations/snowflake/source_asset.py -from dagster import SourceAsset +from dagster import AssetSpec -iris_harvest_data = SourceAsset(key="iris_harvest_data") +iris_harvest_data = AssetSpec(key="iris_harvest_data") ``` -In this example, we created a for a pre-existing table called `iris_harvest_data`. +In this example, we created an for a pre-existing table called `iris_harvest_data`. -Since we supplied the database and the schema in the resource configuration in [Step 1](#step-1-configure-the-snowflake-resource), we only need to provide the table name. We did this by using the `key` parameter in our . When the `iris_harvest_data` asset needs to be loaded in a downstream asset, the data in the `FLOWERS.IRIS.IRIS_HARVEST_DATA` table will be selected and provided to the asset. +Since we supplied the database and the schema in the resource configuration in [Step 1](#step-1-configure-the-snowflake-resource), we only need to provide the table name. We did this by using the `key` parameter in our . When the `iris_harvest_data` asset needs to be loaded in a downstream asset, the data in the `FLOWERS.IRIS.IRIS_HARVEST_DATA` table will be selected and provided to the asset.