Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] - Update examples and docs to use AssetSpec #23431

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def events(context: AssetExecutionContext) -> None:
```python file=/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py startafter=start_marker endbefore=end_marker
from dagster import (
AssetExecutionContext,
AssetSpec,
BackfillPolicy,
DailyPartitionsDefinition,
InputContext,
IOManager,
OutputContext,
SourceAsset,
asset,
)

Expand All @@ -121,7 +121,7 @@ class MyIOManager(IOManager):

daily_partition = DailyPartitionsDefinition(start_date="2020-01-01")

raw_events = SourceAsset("raw_events", partitions_def=daily_partition)
raw_events = AssetSpec("raw_events", partitions_def=daily_partition)


@asset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ If `versioned_number` had used a Dagster-generated data version, the data versio

In the real world, data pipelines depend on external upstream data. So far in this guide, we haven't used any external data; we've been substituting hardcoded data in the asset at the root of our graph and using a code version as a stand-in for the version of that data. We can do better than this.

External data sources in Dagster are modeled by <PyObject object="SourceAsset" displayText="SourceAssets" />. We can add versioning to a `SourceAsset` by making it observable. An observable source asset has a user-defined function that computes and returns a data version.
External data sources in Dagster are modeled by <PyObject object="AssetSpec" displayText="AssetSpecs" />. We can add versioning to a `SourceAsset` by making it observable. An observable source asset has a user-defined function that computes and returns a data version.

Let's add an <PyObject object="observable_source_asset" decorator="true" /> called `input_number`. This will represent a file written by an external process upstream of our pipeline:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ Here's what an equivalent job looks like using asset definitions:
```python file=/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py
from pandas import DataFrame

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import AssetSpec, Definitions, asset, define_asset_job

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model

raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")
raw_users = AssetSpec(key="raw_users", io_manager_key="warehouse")


@asset(io_manager_key="warehouse")
Expand Down Expand Up @@ -301,11 +301,11 @@ Here's an example of an equivalent job that uses asset definitions:
```python file=/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py
from pandas import read_sql

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import AssetSpec, Definitions, asset, define_asset_job

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model

raw_users = SourceAsset(key="raw_users")
raw_users = AssetSpec(key="raw_users")


@asset(deps=[raw_users])
Expand Down
6 changes: 3 additions & 3 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ Here are our asset definitions that define tables we want to materialize.

```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py startafter=start_marker endbefore=end_marker
import pandas as pd
from dagster import AssetKey, SourceAsset, asset
from dagster import AssetKey, AssetSpec, asset
from pandas import DataFrame

sfo_q2_weather_sample = SourceAsset(
sfo_q2_weather_sample = AssetSpec(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
Expand All @@ -52,7 +52,7 @@ def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
return daily_temperature_highs.nlargest(10, "max_tmpf")
```

`sfo_q2_weather_sample` represents our base temperature table. It's a <PyObject module="dagster" object="SourceAsset" />, meaning that we rely on it, but don't generate it.
`sfo_q2_weather_sample` represents our base temperature table. It's an <PyObject module="dagster" object="AssetSpec" />, meaning that we rely on it, but don't generate it.

`daily_temperature_highs` represents a computed asset. It's derived by taking the `sfo_q2_weather_sample` table and applying the decorated function to it. Notice that it's defined using a pure function, a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on in an <PyObject object="IOManager" />. This allows us to swap in different implementations in different environments. For example, in local development, we might want to store data in a local CSV file for easy testing. However in production, we would want to store data in a data warehouse.

Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/bigquery/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ You can specify the default dataset where data will be stored as configuration t
If you want to store assets in different datasets, you can specify the dataset as metadata:

```python file=/integrations/bigquery/reference/dataset.py startafter=start_metadata endbefore=end_metadata dedent=4
daffodil_data = SourceAsset(key=["daffodil_data"], metadata={"schema": "daffodil"})
daffodil_data = AssetSpec(key=["daffodil_data"], metadata={"schema": "daffodil"})

@asset(metadata={"schema": "iris"})
def iris_data() -> pd.DataFrame:
Expand All @@ -292,7 +292,7 @@ def iris_data() -> pd.DataFrame:
You can also specify the dataset as part of the asset's asset key:

```python file=/integrations/bigquery/reference/dataset.py startafter=start_asset_key endbefore=end_asset_key dedent=4
daffodil_data = SourceAsset(key=["gcp", "bigquery", "daffodil", "daffodil_data"])
daffodil_data = AssetSpec(key=["gcp", "bigquery", "daffodil", "daffodil_data"])

@asset(key_prefix=["gcp", "bigquery", "iris"])
def iris_data() -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ Now you can run `dagster dev` and materialize the `iris_data` asset from the Dag
If you already have existing tables in BigQuery and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by creating [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables.

```python file=/integrations/bigquery/tutorial/resource/source_asset.py
from dagster import SourceAsset
from dagster import AssetSpec

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")
```

In this example, you're creating a <PyObject object="SourceAsset" /> for a pre-existing table called `iris_harvest_data`.
In this example, you're creating an <PyObject object="AssetSpec" /> for a pre-existing table called `iris_harvest_data`.

</TabItem>

Expand Down Expand Up @@ -176,9 +176,9 @@ import pandas as pd
from dagster_gcp import BigQueryResource
from google.cloud import bigquery as bq

from dagster import Definitions, SourceAsset, asset
from dagster import AssetSpec, Definitions, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")


@asset
Expand Down Expand Up @@ -314,14 +314,14 @@ When Dagster materializes the `iris_data` asset using the configuration from [St
If you already have existing tables in BigQuery and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can create [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. When using an I/O manager, creating a source asset for an existing table also allows you to tell Dagster how to find the table so it can be fetched for downstream assets.

```python file=/integrations/bigquery/tutorial/io_manager/source_asset.py
from dagster import SourceAsset
from dagster import AssetSpec

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")
```

In this example, you're creating a <PyObject object="SourceAsset" /> for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, you need to tell the BigQuery I/O manager how to find the data, so that the I/O manager can load the data into memory.
In this example, you're creating an <PyObject object="AssetSpec" /> for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, you need to tell the BigQuery I/O manager how to find the data, so that the I/O manager can load the data into memory.

Because you already supplied the project and dataset in the I/O manager configuration in [Step 1: Configure the BigQuery I/O manager](#step-1-configure-the-bigquery-io-manager), you only need to provide the table name. This is done with the `key` parameter in `SourceAsset`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset.
Because you already supplied the project and dataset in the I/O manager configuration in [Step 1: Configure the BigQuery I/O manager](#step-1-configure-the-bigquery-io-manager), you only need to provide the table name. This is done with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset.

</TabItem>
</TabGroup>
Expand Down Expand Up @@ -355,9 +355,9 @@ When finished, your code should look like the following:
import pandas as pd
from dagster_gcp_pandas import BigQueryPandasIOManager

from dagster import Definitions, SourceAsset, asset
from dagster import AssetSpec, Definitions, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")


@asset
Expand Down
6 changes: 3 additions & 3 deletions docs/content/integrations/deltalake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,15 @@ If you want all of your assets to be stored in the same schema, you can specify

If you want to store assets in different schemas, you can specify the schema as part of the asset's asset key:

- **For `SourceAsset`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`.
- **For `AssetSpec`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`.
- **For asset definitions**, use the `key_prefix` parameter. This value will be prepended to the asset name to create the full asset key. In the following example, this would be `iris`.

```python file=/integrations/deltalake/schema.py startafter=start_asset_key endbefore=end_asset_key
import pandas as pd

from dagster import SourceAsset, asset
from dagster import AssetSpec, asset

daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"])
daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"])


@asset(key_prefix=["iris"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ When Dagster materializes the `iris_dataset` asset using the configuration from
If you already have tables in your Delta Lake, you may want to make them available to other Dagster assets. You can accomplish this by using [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables. By creating a source asset for the existing table, you tell Dagster how to find the table so it can be fetched for downstream assets.

```python file=/integrations/deltalake/source_asset.py
from dagster import SourceAsset
from dagster import AssetSpec

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")
```

In this example, we create a <PyObject object="SourceAsset" /> for an existing table containing iris harvest data. To make the data available to other Dagster assets, we need to tell the Delta Lake I/O manager how to find the data.
In this example, we create an <PyObject object="AssetSpec" /> for an existing table containing iris harvest data. To make the data available to other Dagster assets, we need to tell the Delta Lake I/O manager how to find the data.

Because we already supplied the database and schema in the I/O manager configuration in [Step 1](#step-1-configure-the-delta-lake-io-manager), we only need to provide the table name. We do this with the `key` parameter in `SourceAsset`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `iris/iris_harvest_data` folder as a Pandas DataFrame and provide it to the downstream asset.
Because we already supplied the database and schema in the I/O manager configuration in [Step 1](#step-1-configure-the-delta-lake-io-manager), we only need to provide the table name. We do this with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `iris/iris_harvest_data` folder as a Pandas DataFrame and provide it to the downstream asset.

</TabItem>
</TabGroup>
Expand Down Expand Up @@ -149,9 +149,9 @@ import pandas as pd
from dagster_deltalake import LocalConfig
from dagster_deltalake_pandas import DeltaLakePandasIOManager

from dagster import Definitions, SourceAsset, asset
from dagster import AssetSpec, Definitions, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")


@asset
Expand Down
6 changes: 3 additions & 3 deletions docs/content/integrations/duckdb/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ You can specify the default schema where data will be stored as configuration to
If you want to store assets in different schemas, you can specify the schema as metadata:

```python file=/integrations/duckdb/reference/schema.py startafter=start_metadata endbefore=end_metadata dedent=4
daffodil_dataset = SourceAsset(
daffodil_dataset = AssetSpec(
key=["daffodil_dataset"], metadata={"schema": "daffodil"}
)

Expand All @@ -294,11 +294,11 @@ def iris_dataset() -> pd.DataFrame:

You can also specify the schema as part of the asset's asset key:

- **For `SourceAsset`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`.
- **For `AssetSpec`**, use the `key` parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be `daffodil`.
- **For asset definitions**, use the `key_prefix` parameter. This value will be prepended to the asset name to create the full asset key. In the following example, this would be `iris`.

```python file=/integrations/duckdb/reference/schema.py startafter=start_asset_key endbefore=end_asset_key dedent=4
daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"])
daffodil_dataset = AssetSpec(key=["daffodil", "daffodil_dataset"])

@asset(key_prefix=["iris"])
def iris_dataset() -> pd.DataFrame:
Expand Down
22 changes: 11 additions & 11 deletions docs/content/integrations/duckdb/using-duckdb-with-dagster.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ In this example, you're defining an asset that fetches the Iris dataset as a Pan
If you already have existing tables in DuckDB and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by creating [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables.

```python file=/integrations/duckdb/tutorial/io_manager/source_asset.py
from dagster import SourceAsset
from dagster import AssetSpec

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")
```

In this example, you're creating a <PyObject object="SourceAsset" /> for a pre-existing table called `iris_harvest_data`.
In this example, you're creating an <PyObject object="AssetSpec" /> for a pre-existing table called `iris_harvest_data`.

</TabItem>

Expand Down Expand Up @@ -156,9 +156,9 @@ When finished, your code should look like the following:
import pandas as pd
from dagster_duckdb import DuckDBResource

from dagster import Definitions, SourceAsset, asset
from dagster import AssetSpec, Definitions, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")


@asset
Expand Down Expand Up @@ -275,14 +275,14 @@ When Dagster materializes the `iris_dataset` asset using the configuration from
If you already have existing tables in DuckDB and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by creating [source assets](/concepts/io-management/io-managers#using-io-managers-to-load-source-data) for these tables.

```python file=/integrations/duckdb/tutorial/io_manager/source_asset.py
from dagster import SourceAsset
from dagster import AssetSpec

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")
```

In this example, you're creating a <PyObject object="SourceAsset" /> for a pre-existing table containing iris harvests data. To make the data available to other Dagster assets, you need to tell the DuckDB I/O manager how to find the data.
In this example, you're creating an <PyObject object="AssetSpec" /> for a pre-existing table containing iris harvests data. To make the data available to other Dagster assets, you need to tell the DuckDB I/O manager how to find the data.

Because you already supplied the database and schema in the I/O manager configuration in [Step 1: Configure the DuckDB I/O manager](#step-1-configure-the-duckdb-io-manager), you only need to provide the table name. This is done with the `key` parameter in `SourceAsset`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset.
Because you already supplied the database and schema in the I/O manager configuration in [Step 1: Configure the DuckDB I/O manager](#step-1-configure-the-duckdb-io-manager), you only need to provide the table name. This is done with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `IRIS.IRIS_HARVEST_DATA` table as a Pandas DataFrame and provide it to the downstream asset.

</TabItem>
</TabGroup>
Expand Down Expand Up @@ -314,9 +314,9 @@ When finished, your code should look like the following:
import pandas as pd
from dagster_duckdb_pandas import DuckDBPandasIOManager

from dagster import Definitions, SourceAsset, asset
from dagster import AssetSpec, Definitions, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")
iris_harvest_data = AssetSpec(key="iris_harvest_data")


@asset
Expand Down
Loading