Skip to content

Commit

Permalink
remove SourceAsset from assorted docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Aug 21, 2024
1 parent 2616f6d commit 01c596f
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def events(context: AssetExecutionContext) -> None:
```python file=/concepts/partitions_schedules_sensors/backfills/single_run_backfill_io_manager.py startafter=start_marker endbefore=end_marker
from dagster import (
AssetExecutionContext,
AssetSpec,
BackfillPolicy,
DailyPartitionsDefinition,
InputContext,
IOManager,
OutputContext,
SourceAsset,
asset,
)

Expand All @@ -121,7 +121,7 @@ class MyIOManager(IOManager):

daily_partition = DailyPartitionsDefinition(start_date="2020-01-01")

raw_events = SourceAsset("raw_events", partitions_def=daily_partition)
raw_events = AssetSpec("raw_events", partitions_def=daily_partition)


@asset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ Here's what an equivalent job looks like using asset definitions:
```python file=/guides/dagster/enriching_with_software_defined_assets/sda_io_manager.py
from pandas import DataFrame

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import AssetSpec, Definitions, asset, define_asset_job

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model

raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")
raw_users = AssetSpec(key="raw_users", metadata={"dagster/io_manager_key": "warehouse"})


@asset(io_manager_key="warehouse")
Expand Down Expand Up @@ -301,14 +301,12 @@ Here's an example of an equivalent job that uses asset definitions:
```python file=/guides/dagster/enriching_with_software_defined_assets/sda_nothing.py
from pandas import read_sql

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import Definitions, asset, define_asset_job

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model

raw_users = SourceAsset(key="raw_users")


@asset(deps=[raw_users])
@asset(deps=["raw_users"])
def users() -> None:
raw_users_df = read_sql("select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
Expand Down
9 changes: 6 additions & 3 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ Here are our asset definitions that define tables we want to materialize.

```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/table_assets.py startafter=start_marker endbefore=end_marker
import pandas as pd
from dagster import AssetKey, SourceAsset, asset
from dagster import AssetKey, AssetSpec, Definitions, asset
from pandas import DataFrame

sfo_q2_weather_sample = SourceAsset(
sfo_q2_weather_sample = AssetSpec(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
Expand All @@ -50,9 +50,12 @@ def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame:
def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
"""Computes the 10 hottest dates."""
return daily_temperature_highs.nlargest(10, "max_tmpf")


defs = Definitions(assets=[sfo_q2_weather_sample, daily_temperature_highs, hottest_dates])
```

`sfo_q2_weather_sample` represents our base temperature table. It's a <PyObject module="dagster" object="SourceAsset" />, meaning that we rely on it, but don't generate it.
`sfo_q2_weather_sample` represents our base temperature table. It's a table that's used by the assets in our pipeline, but that we're not responsible for generating. We define it using an <PyObject object="AssetSpec" />, which allows us to specify its attributes without providing a function that materializes it.

`daily_temperature_highs` represents a computed asset. It's derived by taking the `sfo_q2_weather_sample` table and applying the decorated function to it. Notice that it's defined using a pure function, a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on in an <PyObject object="IOManager" />. This allows us to swap in different implementations in different environments. For example, in local development, we might want to store data in a local CSV file for easy testing. However in production, we would want to store data in a data warehouse.

Expand Down
13 changes: 6 additions & 7 deletions docs/content/integrations/embedded-elt/dlt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,19 @@ def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets.
### Assigning metadata to upstream source assets
### Assigning metadata to upstream external assets
A common question is how to define metadata on the source assets upstream of the dlt assets.
A common question is how to define metadata on the external assets upstream of the dlt assets.
This can be accomplished by defining a <PyObject object="SourceAsset" /> with a key that matches the one defined in the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
This can be accomplished by defining a <PyObject object="AssetSpec" /> with a key that matches the one defined in the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a <PyObject object="SourceAsset" /> with attributes like `group_name`.
For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a <PyObject object="AssetSpec" /> with attributes like `group_name`.
```python file=/integrations/embedded_elt/dlt_source_assets.py
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dagster import AssetExecutionContext, SourceAsset
from dagster import AssetExecutionContext, AssetSpec
@dlt.source
Expand All @@ -319,8 +319,7 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
thinkific_source_assets = [
SourceAsset(key, group_name="thinkific")
for key in example_dlt_assets.dependency_keys
AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys
]
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# start_marker
import pandas as pd
from dagster import AssetKey, SourceAsset, asset
from dagster import AssetKey, AssetSpec, Definitions, asset
from pandas import DataFrame

sfo_q2_weather_sample = SourceAsset(
sfo_q2_weather_sample = AssetSpec(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
Expand All @@ -23,4 +23,6 @@ def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
return daily_temperature_highs.nlargest(10, "max_tmpf")


defs = Definitions(assets=[sfo_q2_weather_sample, daily_temperature_highs, hottest_dates])

# end_marker
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# start_marker
from dagster import (
AssetExecutionContext,
AssetSpec,
BackfillPolicy,
DailyPartitionsDefinition,
InputContext,
IOManager,
OutputContext,
SourceAsset,
asset,
)

Expand All @@ -23,7 +23,7 @@ def handle_output(self, context: OutputContext, obj):

daily_partition = DailyPartitionsDefinition(start_date="2020-01-01")

raw_events = SourceAsset("raw_events", partitions_def=daily_partition)
raw_events = AssetSpec("raw_events", partitions_def=daily_partition)


@asset(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from pandas import DataFrame

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import AssetSpec, Definitions, asset, define_asset_job

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model

raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")
raw_users = AssetSpec(key="raw_users", metadata={"dagster/io_manager_key": "warehouse"})


@asset(io_manager_key="warehouse")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from pandas import read_sql

from dagster import Definitions, SourceAsset, asset, define_asset_job
from dagster import Definitions, asset, define_asset_job

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model

raw_users = SourceAsset(key="raw_users")


@asset(deps=[raw_users])
@asset(deps=["raw_users"])
def users() -> None:
raw_users_df = read_sql("select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets

from dagster import AssetExecutionContext, SourceAsset
from dagster import AssetExecutionContext, AssetSpec


@dlt.source
Expand All @@ -25,6 +25,5 @@ def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):


thinkific_source_assets = [
SourceAsset(key, group_name="thinkific")
for key in example_dlt_assets.dependency_keys
AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys
]
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airflow.models.operator import BaseOperator
from dagster_airlift.in_airflow import mark_as_dagster_migrating
from dagster_airlift.migration_state import load_migration_state_from_yaml

from dbt_example.shared.load_iris import load_csv_to_duckdb


Expand Down

0 comments on commit 01c596f

Please sign in to comment.