Skip to content

Commit

Permalink
[docs-beta] Update to dagster as dg (#23864)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Update the beta docs code snippets to use the new `import dagster as dg` code pattern. This should be the new approach to all documentation that references dagster code, as consistency is important.

Here are a few reasons why I like this approach:

0) There is prior art in many libraries of this approach, notably `pandas`, `sklearn`, `numpy`, `matplotlib`, `pytest` and many other packages that export many objects. 

1) Fewer import lines in code samples/snippets. Compare the first image which is 12 lines shorter. This makes code easier for users to understand parse.

![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/80phFRWNTJBzbgP9Xj13/1c9cc21d-3b04-4eab-b174-77d23cf74d63.png)

2) Another side benefit is that type-ahead experience is improved. 

In VS Code, for example, typing `dg.sensor` and pressing Ctrl+Space provides you with all the exported Sensor-like objects 

![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/80phFRWNTJBzbgP9Xj13/3c0776cb-8006-488e-849a-36ae64c193a1.png)

Compare to typing `sensor` <ctrl-space> instead with `import dagster` 

![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/80phFRWNTJBzbgP9Xj13/8de32f3e-7ac3-4902-84a5-ebba88ebea24.png)

3) Finally, we can always revert this change in the future by searching the examples for `dg.` if we decide this approach is fundamentally broken.

## How I Tested These Changes

bk, pytest

## Changelog [New | Bug | Docs]

NOCHANGELOG
  • Loading branch information
PedramNavid authored Aug 25, 2024
1 parent 8fa1b6e commit 9dcf00e
Show file tree
Hide file tree
Showing 22 changed files with 158 additions and 220 deletions.
15 changes: 15 additions & 0 deletions examples/docs_beta_snippets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
This module exists only to enable testing docs snippets/examples in CI, and should not be installed
otherwise.

## Code Style

Please use the following code style to keep imports short:

```python
import dagster as dg

def my_cool_asset(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
return dg.MaterializeResult(
metadata={
"foo": "bar",
}
)
```

## Testing

You can test that all code loads into Python correctly with:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,38 @@
from dagster import (
AssetExecutionContext,
AssetKey,
AssetMaterialization,
Definitions,
MaterializeResult,
RunRequest,
SensorEvaluationContext,
SkipReason,
asset,
asset_sensor,
define_asset_job,
)
import dagster as dg


@asset
def daily_sales_data(context: AssetExecutionContext):
@dg.asset
def daily_sales_data(context: dg.AssetExecutionContext):
context.log.info("Asset to watch, perhaps some function sets metadata here")
yield MaterializeResult(metadata={"specific_property": "value"})
yield dg.MaterializeResult(metadata={"specific_property": "value"})


@asset
def weekly_report(context: AssetExecutionContext):
@dg.asset
def weekly_report(context: dg.AssetExecutionContext):
context.log.info("Running weekly report")


my_job = define_asset_job("my_job", [weekly_report])
my_job = dg.define_asset_job("my_job", [weekly_report])


@asset_sensor(asset_key=AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
@dg.asset_sensor(asset_key=dg.AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: dg.SensorEvaluationContext, asset_event):
# Provide a type hint on the underlying event
materialization: AssetMaterialization = (
materialization: dg.AssetMaterialization = (
asset_event.dagster_event.event_specific_data.materialization
)

# Example custom logic: Check if the asset metadata has a specific property
# highlight-start
if "specific_property" in materialization.metadata:
context.log.info("Triggering job based on custom evaluation logic")
yield RunRequest(run_key=context.cursor)
yield dg.RunRequest(run_key=context.cursor)
else:
yield SkipReason("Asset materialization does not have the required property")
yield dg.SkipReason("Asset materialization does not have the required property")
# highlight-end


defs = Definitions(
defs = dg.Definitions(
assets=[daily_sales_data, weekly_report],
jobs=[my_job],
sensors=[daily_sales_data_sensor],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,54 +1,41 @@
from dagster import (
AssetExecutionContext,
AssetKey,
AssetMaterialization,
Config,
Definitions,
MaterializeResult,
RunConfig,
RunRequest,
SensorEvaluationContext,
asset,
asset_sensor,
define_asset_job,
)
import dagster as dg


class MyConfig(Config):
class MyConfig(dg.Config):
param1: str


@asset
def daily_sales_data(context: AssetExecutionContext):
@dg.asset
def daily_sales_data(context: dg.AssetExecutionContext):
context.log.info("Asset to watch")
# highlight-next-line
yield MaterializeResult(metadata={"specific_property": "value"})
yield dg.MaterializeResult(metadata={"specific_property": "value"})


@asset
def weekly_report(context: AssetExecutionContext, config: MyConfig):
@dg.asset
def weekly_report(context: dg.AssetExecutionContext, config: MyConfig):
context.log.info(f"Running weekly report with param1: {config.param1}")


my_job = define_asset_job(
my_job = dg.define_asset_job(
"my_job",
[weekly_report],
config=RunConfig(ops={"weekly_report": MyConfig(param1="value")}),
config=dg.RunConfig(ops={"weekly_report": MyConfig(param1="value")}),
)


@asset_sensor(asset_key=AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
materialization: AssetMaterialization = (
@dg.asset_sensor(asset_key=dg.AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: dg.SensorEvaluationContext, asset_event):
materialization: dg.AssetMaterialization = (
asset_event.dagster_event.event_specific_data.materialization
)

# Example custom logic: Check if the asset metadata has a specific property
# highlight-start
if "specific_property" in materialization.metadata:
yield RunRequest(
yield dg.RunRequest(
run_key=context.cursor,
run_config=RunConfig(
run_config=dg.RunConfig(
ops={
"weekly_report": MyConfig(
param1=str(materialization.metadata.get("specific_property"))
Expand All @@ -59,7 +46,7 @@ def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
# highlight-end


defs = Definitions(
defs = dg.Definitions(
assets=[daily_sales_data, weekly_report],
jobs=[my_job],
sensors=[daily_sales_data_sensor],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
from dagster import (
AssetKey,
MultiAssetSensorEvaluationContext,
RunRequest,
asset,
define_asset_job,
multi_asset_sensor,
)
import dagster as dg


@asset
@dg.asset
def target_asset():
pass


downstream_job = define_asset_job("downstream_job", [target_asset])
downstream_job = dg.define_asset_job("downstream_job", [target_asset])


@multi_asset_sensor(
@dg.multi_asset_sensor(
monitored_assets=[
AssetKey("upstream_asset_1"),
AssetKey("upstream_asset_2"),
dg.AssetKey("upstream_asset_1"),
dg.AssetKey("upstream_asset_2"),
],
job=downstream_job,
)
def my_multi_asset_sensor(context: MultiAssetSensorEvaluationContext):
def my_multi_asset_sensor(context: dg.MultiAssetSensorEvaluationContext):
run_requests = []
for (
asset_key,
materialization,
) in context.latest_materialization_records_by_key().items():
if materialization:
run_requests.append(RunRequest(asset_selection=[asset_key]))
run_requests.append(dg.RunRequest(asset_selection=[asset_key]))
context.advance_cursor({asset_key: materialization})
return run_requests
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
from dagster import (
DailyPartitionsDefinition,
asset,
build_schedule_from_partitioned_job,
define_asset_job,
)
import dagster as dg

daily_partition = DailyPartitionsDefinition(start_date="2024-05-20")
daily_partition = dg.DailyPartitionsDefinition(start_date="2024-05-20")


@asset(partitions_def=daily_partition)
@dg.asset(partitions_def=daily_partition)
def daily_asset(): ...


partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset])
partitioned_asset_job = dg.define_asset_job("partitioned_job", selection=[daily_asset])

# highlight-start
# This partition will run daily
asset_partitioned_schedule = build_schedule_from_partitioned_job(
asset_partitioned_schedule = dg.build_schedule_from_partitioned_job(
partitioned_asset_job,
)
# highlight-end
Original file line number Diff line number Diff line change
@@ -1,35 +1,27 @@
from dagster import (
AssetExecutionContext,
AssetKey,
Definitions,
RunRequest,
asset,
asset_sensor,
define_asset_job,
)
import dagster as dg


@asset
def daily_sales_data(context: AssetExecutionContext):
@dg.asset
def daily_sales_data(context: dg.AssetExecutionContext):
context.log.info("Asset to watch")


@asset
def weekly_report(context: AssetExecutionContext):
@dg.asset
def weekly_report(context: dg.AssetExecutionContext):
context.log.info("Asset to trigger")


my_job = define_asset_job("my_job", [weekly_report])
my_job = dg.define_asset_job("my_job", [weekly_report])


# highlight-start
@asset_sensor(asset_key=AssetKey("daily_sales_data"), job_name="my_job")
@dg.asset_sensor(asset_key=dg.AssetKey("daily_sales_data"), job_name="my_job")
def daily_sales_data_sensor():
return RunRequest()
return dg.RunRequest()
# highlight-end


defs = Definitions(
defs = dg.Definitions(
assets=[daily_sales_data, weekly_report],
jobs=[my_job],
sensors=[daily_sales_data_sensor],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
from dagster import Definitions, ScheduleDefinition, asset, define_asset_job
import dagster as dg


@asset
@dg.asset
def customer_data(): ...


@asset
@dg.asset
def sales_report(): ...


daily_refresh_job = define_asset_job(
daily_refresh_job = dg.define_asset_job(
"daily_refresh", selection=["customer_data", "sales_report"]
)

# highlight-start
daily_schedule = ScheduleDefinition(
daily_schedule = dg.ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *", # Runs at midnight daily
)
# highlight-end

defs = Definitions(
defs = dg.Definitions(
assets=[customer_data, sales_report],
jobs=[daily_refresh_job],
schedules=[daily_schedule],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
import random
from typing import List

from dagster import (
AssetExecutionContext,
Definitions,
RunRequest,
SkipReason,
asset,
define_asset_job,
sensor,
)


@asset
def my_asset(context: AssetExecutionContext):
import dagster as dg


@dg.asset
def my_asset(context: dg.AssetExecutionContext):
context.log.info("Hello, world!")


my_job = define_asset_job("my_job", selection=[my_asset])
my_job = dg.define_asset_job("my_job", selection=[my_asset])


# highlight-start
Expand All @@ -27,24 +19,22 @@ def check_for_new_files() -> List[str]:
return []


@sensor(job=my_job, minimum_interval_seconds=5)
@dg.sensor(job=my_job, minimum_interval_seconds=5)
def new_file_sensor():
new_files = check_for_new_files()
if new_files:
for filename in new_files:
yield RunRequest(run_key=filename)
yield dg.RunRequest(run_key=filename)
else:
yield SkipReason("No new files found")
yield dg.SkipReason("No new files found")
# highlight-end


defs = Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor])
defs = dg.Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor])


if __name__ == "__main__":
from dagster import materialize

new_file_sensor()
materialize(
dg.materialize(
[my_asset],
)
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from dagster import asset
import dagster as dg


# Warning! This is not the right way to create assets
@asset
@dg.asset
def download_files():
# Download files from S3, the web, etc.
...


@asset
@dg.asset
def unzip_files():
# Unzip files to local disk or persistent storage
...


@asset
@dg.asset
def load_data():
# Read data previously written and store in a data warehouse
...
Loading

1 comment on commit 9dcf00e

@github-actions
Copy link

@github-actions github-actions bot commented on 9dcf00e Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs-beta ready!

✅ Preview
https://dagster-docs-beta-ivjpuqbv1-elementl.vercel.app
https://dagster-docs-beta.dagster-docs.io

Built with commit 9dcf00e.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.