Skip to content

Commit

Permalink
[deprecate-source-assets] remove SourceAsset from assorted docs (#23718)
Browse files Browse the repository at this point in the history
## Summary & Motivation

SourceAsset is a deprecated concept, but our dlt docs and some random
guides still reference them. This PR switches mentions to AssetSpec and
"external asset".

## How I Tested These Changes
  • Loading branch information
sryza authored Aug 21, 2024
1 parent b7610cd commit aff5ad4
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def events(context: AssetExecutionContext) -> None:
```python file=/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py startafter=start_marker endbefore=end_marker
from dagster import (
AssetExecutionContext,
AssetSpec,
BackfillPolicy,
DailyPartitionsDefinition,
InputContext,
IOManager,
OutputContext,
SourceAsset,
asset,
)

Expand All @@ -121,7 +121,7 @@ class MyIOManager(IOManager):

daily_partition = DailyPartitionsDefinition(start_date="2020-01-01")

raw_events = SourceAsset("raw_events", partitions_def=daily_partition)
raw_events = AssetSpec("raw_events", partitions_def=daily_partition)


@asset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ Here's what an equivalent job looks like using asset definitions:
```python file=/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py
from pandas import DataFrame

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import AssetSpec, Definitions, asset, define_asset_job

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model

raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")
raw_users = AssetSpec(key="raw_users", metadata={"dagster/io_manager_key": "warehouse"})


@asset(io_manager_key="warehouse")
Expand Down Expand Up @@ -301,14 +301,12 @@ Here's an example of an equivalent job that uses asset definitions:
```python file=/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py
from pandas import read_sql

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import Definitions, asset, define_asset_job

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model

raw_users = SourceAsset(key="raw_users")


@asset(deps=[raw_users])
@asset(deps=["raw_users"])
def users() -> None:
raw_users_df = read_sql("select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
Expand Down
9 changes: 6 additions & 3 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ Here are our asset definitions that define tables we want to materialize.

```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py startafter=start_marker endbefore=end_marker
import pandas as pd
from dagster import AssetKey, SourceAsset, asset
from dagster import AssetKey, AssetSpec, Definitions, asset
from pandas import DataFrame

sfo_q2_weather_sample = SourceAsset(
sfo_q2_weather_sample = AssetSpec(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
Expand All @@ -50,9 +50,12 @@ def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame:
def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
"""Computes the 10 hottest dates."""
return daily_temperature_highs.nlargest(10, "max_tmpf")


defs = Definitions(assets=[sfo_q2_weather_sample, daily_temperature_highs, hottest_dates])
```

`sfo_q2_weather_sample` represents our base temperature table. It's a <PyObject module="dagster" object="SourceAsset" />, meaning that we rely on it, but don't generate it.
`sfo_q2_weather_sample` represents our base temperature table. It's a table that's used by the assets in our pipeline, but that we're not responsible for generating. We define it using an <PyObject object="AssetSpec" />, which allows us to specify its attributes without providing a function that materializes it.

`daily_temperature_highs` represents a computed asset. It's derived by taking the `sfo_q2_weather_sample` table and applying the decorated function to it. Notice that it's defined using a pure function, a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on in an <PyObject object="IOManager" />. This allows us to swap in different implementations in different environments. For example, in local development, we might want to store data in a local CSV file for easy testing. However in production, we would want to store data in a data warehouse.

Expand Down
13 changes: 6 additions & 7 deletions docs/content/integrations/embedded-elt/dlt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,19 @@ def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets.
### Assigning metadata to upstream source assets
### Assigning metadata to upstream external assets
A common question is how to define metadata on the source assets upstream of the dlt assets.
A common question is how to define metadata on the external assets upstream of the dlt assets.
This can be accomplished by defining a <PyObject object="SourceAsset" /> with a key that matches the one defined in the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
This can be accomplished by defining a <PyObject object="AssetSpec" /> with a key that matches the one defined in the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a <PyObject object="SourceAsset" /> with attributes like `group_name`.
For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a <PyObject object="AssetSpec" /> with attributes like `group_name`.
```python file=/integrations/embedded_elt/dlt_source_assets.py
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dagster import AssetExecutionContext, SourceAsset
from dagster import AssetExecutionContext, AssetSpec
@dlt.source
Expand All @@ -319,8 +319,7 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
thinkific_source_assets = [
SourceAsset(key, group_name="thinkific")
for key in example_dlt_assets.dependency_keys
AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys
]
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# start_marker
import pandas as pd
from dagster import AssetKey, SourceAsset, asset
from dagster import AssetKey, AssetSpec, Definitions, asset
from pandas import DataFrame

sfo_q2_weather_sample = SourceAsset(
sfo_q2_weather_sample = AssetSpec(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
Expand All @@ -23,4 +23,6 @@ def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
return daily_temperature_highs.nlargest(10, "max_tmpf")


defs = Definitions(assets=[sfo_q2_weather_sample, daily_temperature_highs, hottest_dates])

# end_marker
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import load_assets_from_modules, materialize
from dagster import materialize
from dagster._core.test_utils import instance_for_test

from assets_pandas_pyspark.assets import spark_asset, table_assets
Expand All @@ -9,7 +9,7 @@ def test_weather_assets():

with instance_for_test() as instance:
assert materialize(
load_assets_from_modules([table_assets]),
table_assets.defs.assets,
instance=instance,
resources={"io_manager": LocalFileSystemIOManager()},
).success
Expand All @@ -20,7 +20,7 @@ def test_spark_weather_assets():

with instance_for_test() as instance:
assert materialize(
load_assets_from_modules([table_assets, spark_asset]),
[*table_assets.defs.assets, spark_asset.daily_temperature_high_diffs],
instance=instance,
resources={"io_manager": LocalFileSystemIOManager()},
).success
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# start_marker
from dagster import (
AssetExecutionContext,
AssetSpec,
BackfillPolicy,
DailyPartitionsDefinition,
InputContext,
IOManager,
OutputContext,
SourceAsset,
asset,
)

Expand All @@ -23,7 +23,7 @@ def handle_output(self, context: OutputContext, obj):

daily_partition = DailyPartitionsDefinition(start_date="2020-01-01")

raw_events = SourceAsset("raw_events", partitions_def=daily_partition)
raw_events = AssetSpec("raw_events", partitions_def=daily_partition)


@asset(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from pandas import DataFrame

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import AssetSpec, Definitions, asset, define_asset_job

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model

raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")
raw_users = AssetSpec(key="raw_users", metadata={"dagster/io_manager_key": "warehouse"})


@asset(io_manager_key="warehouse")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from pandas import read_sql

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import Definitions, asset, define_asset_job

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model

raw_users = SourceAsset(key="raw_users")


@asset(deps=[raw_users])
@asset(deps=["raw_users"])
def users() -> None:
raw_users_df = read_sql("select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets

from dagster import AssetExecutionContext, SourceAsset
from dagster import AssetExecutionContext, AssetSpec


@dlt.source
Expand All @@ -25,6 +25,5 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):


thinkific_source_assets = [
SourceAsset(key, group_name="thinkific")
for key in example_dlt_assets.dependency_keys
AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys
]

1 comment on commit aff5ad4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-qhwm7tvc5-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit aff5ad4.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.