Skip to content

Commit

Permalink
[docs-beta] Update to dagster as dg
Browse files Browse the repository at this point in the history
  • Loading branch information
PedramNavid committed Aug 23, 2024
1 parent d415364 commit b4ad258
Show file tree
Hide file tree
Showing 22 changed files with 158 additions and 220 deletions.
15 changes: 15 additions & 0 deletions examples/docs_beta_snippets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
This module exists only to enable testing docs snippets/examples in CI, and should not be installed
otherwise.

## Code Style

Please use the following code style to keep imports short:

```python
import dagster as dg

def my_cool_asset(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
return dg.MaterializeResult(
metadata={
"foo": "bar",
}
)
```

## Testing

You can test that all code loads into Python correctly with:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,38 @@
from dagster import (
AssetExecutionContext,
AssetKey,
AssetMaterialization,
Definitions,
MaterializeResult,
RunRequest,
SensorEvaluationContext,
SkipReason,
asset,
asset_sensor,
define_asset_job,
)
import dagster as dg


@asset
def daily_sales_data(context: AssetExecutionContext):
@dg.asset
def daily_sales_data(context: dg.AssetExecutionContext):
context.log.info("Asset to watch, perhaps some function sets metadata here")
yield MaterializeResult(metadata={"specific_property": "value"})
yield dg.MaterializeResult(metadata={"specific_property": "value"})


@asset
def weekly_report(context: AssetExecutionContext):
@dg.asset
def weekly_report(context: dg.AssetExecutionContext):
context.log.info("Running weekly report")


my_job = define_asset_job("my_job", [weekly_report])
my_job = dg.define_asset_job("my_job", [weekly_report])


@asset_sensor(asset_key=AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
@dg.asset_sensor(asset_key=dg.AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: dg.SensorEvaluationContext, asset_event):
# Provide a type hint on the underlying event
materialization: AssetMaterialization = (
materialization: dg.AssetMaterialization = (
asset_event.dagster_event.event_specific_data.materialization
)

# Example custom logic: Check if the asset metadata has a specific property
# highlight-start
if "specific_property" in materialization.metadata:
context.log.info("Triggering job based on custom evaluation logic")
yield RunRequest(run_key=context.cursor)
yield dg.RunRequest(run_key=context.cursor)
else:
yield SkipReason("Asset materialization does not have the required property")
yield dg.SkipReason("Asset materialization does not have the required property")
# highlight-end


defs = Definitions(
defs = dg.Definitions(
assets=[daily_sales_data, weekly_report],
jobs=[my_job],
sensors=[daily_sales_data_sensor],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,54 +1,41 @@
from dagster import (
AssetExecutionContext,
AssetKey,
AssetMaterialization,
Config,
Definitions,
MaterializeResult,
RunConfig,
RunRequest,
SensorEvaluationContext,
asset,
asset_sensor,
define_asset_job,
)
import dagster as dg


class MyConfig(Config):
class MyConfig(dg.Config):
param1: str


@asset
def daily_sales_data(context: AssetExecutionContext):
@dg.asset
def daily_sales_data(context: dg.AssetExecutionContext):
context.log.info("Asset to watch")
# highlight-next-line
yield MaterializeResult(metadata={"specific_property": "value"})
yield dg.MaterializeResult(metadata={"specific_property": "value"})


@asset
def weekly_report(context: AssetExecutionContext, config: MyConfig):
@dg.asset
def weekly_report(context: dg.AssetExecutionContext, config: MyConfig):
context.log.info(f"Running weekly report with param1: {config.param1}")


my_job = define_asset_job(
my_job = dg.define_asset_job(
"my_job",
[weekly_report],
config=RunConfig(ops={"weekly_report": MyConfig(param1="value")}),
config=dg.RunConfig(ops={"weekly_report": MyConfig(param1="value")}),
)


@asset_sensor(asset_key=AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
materialization: AssetMaterialization = (
@dg.asset_sensor(asset_key=dg.AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: dg.SensorEvaluationContext, asset_event):
materialization: dg.AssetMaterialization = (
asset_event.dagster_event.event_specific_data.materialization
)

# Example custom logic: Check if the asset metadata has a specific property
# highlight-start
if "specific_property" in materialization.metadata:
yield RunRequest(
yield dg.RunRequest(
run_key=context.cursor,
run_config=RunConfig(
run_config=dg.RunConfig(
ops={
"weekly_report": MyConfig(
param1=str(materialization.metadata.get("specific_property"))
Expand All @@ -59,7 +46,7 @@ def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
# highlight-end


defs = Definitions(
defs = dg.Definitions(
assets=[daily_sales_data, weekly_report],
jobs=[my_job],
sensors=[daily_sales_data_sensor],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
from dagster import (
AssetKey,
MultiAssetSensorEvaluationContext,
RunRequest,
asset,
define_asset_job,
multi_asset_sensor,
)
import dagster as dg


@asset
@dg.asset
def target_asset():
pass


downstream_job = define_asset_job("downstream_job", [target_asset])
downstream_job = dg.define_asset_job("downstream_job", [target_asset])


@multi_asset_sensor(
@dg.multi_asset_sensor(
monitored_assets=[
AssetKey("upstream_asset_1"),
AssetKey("upstream_asset_2"),
dg.AssetKey("upstream_asset_1"),
dg.AssetKey("upstream_asset_2"),
],
job=downstream_job,
)
def my_multi_asset_sensor(context: MultiAssetSensorEvaluationContext):
def my_multi_asset_sensor(context: dg.MultiAssetSensorEvaluationContext):
run_requests = []
for (
asset_key,
materialization,
) in context.latest_materialization_records_by_key().items():
if materialization:
run_requests.append(RunRequest(asset_selection=[asset_key]))
run_requests.append(dg.RunRequest(asset_selection=[asset_key]))
context.advance_cursor({asset_key: materialization})
return run_requests
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
from dagster import (
DailyPartitionsDefinition,
asset,
build_schedule_from_partitioned_job,
define_asset_job,
)
import dagster as dg

daily_partition = DailyPartitionsDefinition(start_date="2024-05-20")
daily_partition = dg.DailyPartitionsDefinition(start_date="2024-05-20")


@asset(partitions_def=daily_partition)
@dg.asset(partitions_def=daily_partition)
def daily_asset(): ...


partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset])
partitioned_asset_job = dg.define_asset_job("partitioned_job", selection=[daily_asset])

# highlight-start
# This partition will run daily
asset_partitioned_schedule = build_schedule_from_partitioned_job(
asset_partitioned_schedule = dg.build_schedule_from_partitioned_job(
partitioned_asset_job,
)
# highlight-end
Original file line number Diff line number Diff line change
@@ -1,35 +1,27 @@
from dagster import (
AssetExecutionContext,
AssetKey,
Definitions,
RunRequest,
asset,
asset_sensor,
define_asset_job,
)
import dagster as dg


@asset
def daily_sales_data(context: AssetExecutionContext):
@dg.asset
def daily_sales_data(context: dg.AssetExecutionContext):
context.log.info("Asset to watch")


@asset
def weekly_report(context: AssetExecutionContext):
@dg.asset
def weekly_report(context: dg.AssetExecutionContext):
context.log.info("Asset to trigger")


my_job = define_asset_job("my_job", [weekly_report])
my_job = dg.define_asset_job("my_job", [weekly_report])


# highlight-start
@asset_sensor(asset_key=AssetKey("daily_sales_data"), job_name="my_job")
@dg.asset_sensor(asset_key=dg.AssetKey("daily_sales_data"), job_name="my_job")
def daily_sales_data_sensor():
return RunRequest()
return dg.RunRequest()
# highlight-end


defs = Definitions(
defs = dg.Definitions(
assets=[daily_sales_data, weekly_report],
jobs=[my_job],
sensors=[daily_sales_data_sensor],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
from dagster import Definitions, ScheduleDefinition, asset, define_asset_job
import dagster as dg


@asset
@dg.asset
def customer_data(): ...


@asset
@dg.asset
def sales_report(): ...


daily_refresh_job = define_asset_job(
daily_refresh_job = dg.define_asset_job(
"daily_refresh", selection=["customer_data", "sales_report"]
)

# highlight-start
daily_schedule = ScheduleDefinition(
daily_schedule = dg.ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *", # Runs at midnight daily
)
# highlight-end

defs = Definitions(
defs = dg.Definitions(
assets=[customer_data, sales_report],
jobs=[daily_refresh_job],
schedules=[daily_schedule],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
import random
from typing import List

from dagster import (
AssetExecutionContext,
Definitions,
RunRequest,
SkipReason,
asset,
define_asset_job,
sensor,
)


@asset
def my_asset(context: AssetExecutionContext):
import dagster as dg


@dg.asset
def my_asset(context: dg.AssetExecutionContext):
context.log.info("Hello, world!")


my_job = define_asset_job("my_job", selection=[my_asset])
my_job = dg.define_asset_job("my_job", selection=[my_asset])


# highlight-start
Expand All @@ -27,24 +19,22 @@ def check_for_new_files() -> List[str]:
return []


@sensor(job=my_job, minimum_interval_seconds=5)
@dg.sensor(job=my_job, minimum_interval_seconds=5)
def new_file_sensor():
new_files = check_for_new_files()
if new_files:
for filename in new_files:
yield RunRequest(run_key=filename)
yield dg.RunRequest(run_key=filename)
else:
yield SkipReason("No new files found")
yield dg.SkipReason("No new files found")
# highlight-end


defs = Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor])
defs = dg.Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor])


if __name__ == "__main__":
from dagster import materialize

new_file_sensor()
materialize(
dg.materialize(
[my_asset],
)
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from dagster import asset
import dagster as dg


# Warning! This is not the right way to create assets
@asset
@dg.asset
def download_files():
# Download files from S3, the web, etc.
...


@asset
@dg.asset
def unzip_files():
# Unzip files to local disk or persistent storage
...


@asset
@dg.asset
def load_data():
# Read data previously written and store in a data warehouse
...
Loading

0 comments on commit b4ad258

Please sign in to comment.