Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automation Part 2 #23723

Merged
merged 14 commits into from
Aug 20, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from dagster import (
AssetExecutionContext,
AssetKey,
AssetMaterialization,
Definitions,
MaterializeResult,
RunRequest,
SensorEvaluationContext,
SkipReason,
asset,
asset_sensor,
define_asset_job,
)


@asset
def daily_sales_data(context: AssetExecutionContext):
context.log.info("Asset to watch, perhaps some function sets metadata here")
yield MaterializeResult(metadata={"specific_property": "value"})


@asset
def weekly_report(context: AssetExecutionContext):
context.log.info("Running weekly report")


my_job = define_asset_job("my_job", [weekly_report])


@asset_sensor(asset_key=AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
# Provide a type hint on the underlying event
materialization: AssetMaterialization = (
asset_event.dagster_event.event_specific_data.materialization
)

# Example custom logic: Check if the asset metadata has a specific property
# highlight-start
if "specific_property" in materialization.metadata:
context.log.info("Triggering job based on custom evaluation logic")
yield RunRequest(run_key=context.cursor)
else:
yield SkipReason("Asset materialization does not have the required property")
# highlight-end


defs = Definitions(
assets=[daily_sales_data, weekly_report],
jobs=[my_job],
sensors=[daily_sales_data_sensor],
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from dagster import (
AssetExecutionContext,
AssetKey,
AssetMaterialization,
Config,
Definitions,
EventLogEntry,
MaterializeResult,
RunConfig,
RunRequest,
SensorEvaluationContext,
asset,
Expand All @@ -11,38 +14,49 @@
)


class MyConfig(Config):
param1: str


@asset
def daily_sales_data(context: AssetExecutionContext):
context.log.info("Asset to watch")
# highlight-next-line
yield MaterializeResult(metadata={"specific_property": "value"})


@asset
def weekly_report(context: AssetExecutionContext):
context.log.info("Asset to trigger")
def weekly_report(context: AssetExecutionContext, config: MyConfig):
context.log.info(f"Running weekly report with param1: {config.param1}")


my_job = define_asset_job("my_job", [weekly_report])
my_job = define_asset_job(
"my_job",
[weekly_report],
config=RunConfig(ops={"weekly_report": MyConfig(param1="value")}),
)


# highlight-start
@asset_sensor(asset_key=AssetKey("daily_sales_data"), job=my_job)
def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
# This satisifies the type checker. Asset events are guaranteed to have a dagster_event and asset_key.
assert asset_event.dagster_event is not None
assert asset_event.dagster_event.asset_key is not None

return RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"read_materialization": {
"config": {
"asset_key": asset_event.dagster_event.asset_key.path,
}
def daily_sales_data_sensor(context: SensorEvaluationContext, asset_event):
materialization: AssetMaterialization = (
asset_event.dagster_event.event_specific_data.materialization
)

# Example custom logic: Check if the asset metadata has a specific property
# highlight-start
if "specific_property" in materialization.metadata:
yield RunRequest(
run_key=context.cursor,
run_config=RunConfig(
ops={
"weekly_report": MyConfig(
param1=str(materialization.metadata.get("specific_property"))
)
}
}
},
) # highlight-end
),
)
# highlight-end


defs = Definitions(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dagster import (
AssetKey,
MultiAssetSensorEvaluationContext,
RunRequest,
asset,
define_asset_job,
multi_asset_sensor,
)


@asset
def target_asset():
pass


downstream_job = define_asset_job("downstream_job", [target_asset])


@multi_asset_sensor(
monitored_assets=[
AssetKey("upstream_asset_1"),
AssetKey("upstream_asset_2"),
],
job=downstream_job,
)
def my_multi_asset_sensor(context: MultiAssetSensorEvaluationContext):
run_requests = []
for asset_key, materialization in context.latest_materialization_records_by_key().items():
if materialization:
run_requests.append(RunRequest(asset_selection=[asset_key]))
context.advance_cursor({asset_key: materialization})
return run_requests
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dagster import (
DailyPartitionsDefinition,
asset,
build_schedule_from_partitioned_job,
define_asset_job,
)

daily_partition = DailyPartitionsDefinition(start_date="2024-05-20")


@asset(partitions_def=daily_partition)
def daily_asset(): ...


partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset])

# highlight-start
# This partition will run daily
asset_partitioned_schedule = build_schedule_from_partitioned_job(
partitioned_asset_job,
)
# highlight-end
67 changes: 66 additions & 1 deletion docs/docs-next/docs/concepts/automation.md
Original file line number Diff line number Diff line change
@@ -1 +1,66 @@
# Automation
---
title: About Automation
---

Dagster has support for several types of automation. All automation in Dagster responds to some external event.
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

The first system, and the most basic, is the [Schedule](/guides/automation/schedules), which responds to time.

[Sensors](/guides/automation/sensors) are like schedules, but they respond to an external event defined by the user.

[Asset Sensors](/guides/automation/asset-sensors) are a special case of sensor that responds to changes in asset materialization
as reported by the Event Log.

Finally, the Declarative Automation system is a
more complex system that uses conditions on the assets to determine when to execute.

## About Schedules
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this being here, what will the Schedules concept page look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really need a 'Schedules' concept page or if Automation is enough?
We could then link to <concept/automation#Schedules>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be fine for Schedules - it is something that everyone is probably familiar with. Do you think it will be a weird experience if Schedules doesn't have one, but something like Sensors does?


In Dagster, a schedule is defined by the `ScheduleDefinition` class, or through the `@schedule` decorator. The `@schedule`
decorator is more flexible than the `ScheduleDefinition` class, allowing you to configure job behaviour or emit log messages
as the schedule is processed.

Schedules were one of the first types of automation in Dagster, created before the introduction of Software-Defined Assets.
As such, you may find that many of the examples can seem foreign if you are used to only working within the asset framework.

For more on how assets and ops inter-relate, read about [Assets and Ops](/concepts/assets#assets-and-ops)

The `dagster-daemon` process is responsible for submitting runs by checking each schedule at a regular interval to determine
if it's time to execute the underlying job.

A schedule can be thought of as a wrapper around two pieces:

- A `JobDefinition`, which is a set of assets to materialize or ops to execute.
- A `cron` string, which describes the schedule.

### Defining a schedule using `ScheduleDefinition`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should include code in concept pages - it's easy to fall into the 'everything but the kitchen sink' approach we have now. What exactly does it add here?

I especially don't think we should do that on this concept page, since the plan is to also add sections for sensors and eventually DA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I did a great job with this page, but my thinking was:

  • how-to guides should be code-complete examples driven by a task, that you can copy-paste and run, which cover 70% of what people do
  • Concept Explanations explain a topic, and code can be used to explain that topic, but there is no expectation that a code block is self-contained or runnable.

For example, in this section, I wanted to explain the difference between using @schedule and ScheduleDefinition, and using code is part of that explanation.

I did not fully write out the explanation for each, but that would be the idea., otherwise, I think we risk polluting our guides.

Share your concern about it becoming a kitchen sink though. I think maybe we have grades of review/input on content possibly? Maybe how-to guides anyone can start and write, but concept pages need more involvement from docs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no expectation that a code block is self-contained or runnable.

We might get flak for this, but I get what you mean.

And yes, docs should absolutely be involved in concept pages, including whether a topic needs one or not.


```python
ecommerce_schedule = ScheduleDefinition(
job=ecommerce_job,
cron_schedule="15 5 * * 1-5",
)
```

By default, schedules aren't enabled. You can enable them by visiting the Automation tab and toggling the schedule,
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved
or set a default status to `RUNNING` when you define the schedule.

```python
ecommerce_schedule = ScheduleDefinition(
job=ecommerce_job,
cron_schedule="15 5 * * 1-5",
default_status=DefaultScheduleStatus.RUNNING,
)
```

### Defining a schedule using `@schedule`
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

If you want more control over the schedule, you can use the `@schedule` decorator. In doing so, you are then responsible for either
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"More control" meaning what, exactly? What additional capabilities does this approach give me?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about something like this?

Define a schedule using @schedule

For more advanced scheduling needs, you can use the @schedule decorator. This allows you to implement custom logic to determine whether a schedule should run or not. When using this decorator, you need to return either a RunRequest to execute the job or a SkipReason to skip the execution. Additionally, you can log messages that will be displayed in the Dagster UI's schedule tick history, providing more context about the schedule's behavior.

@schedule(cron_schedule="15 5 * * 1-5")
def ecommerce_schedule(context):
    if context.execution_time.hour < 10:
        context.log.info("Not running ecommerce job at this time.")
        return SkipReason("Not running ecommerce job at this time.")
    else:
        context.log.info("This log message will be visible in the Dagster UI.")
        return RunRequest()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better!

emitting a `RunRequest` or a `SkipReason`. You can also emit logs, which will be visible in the Dagster UI for a given schedule's tick history.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a RunRequest? What about a SkipReason? How can I find out more about these things?


```python
@schedule(cron_schedule="15 5 * * 1-5")
def ecommerce_schedule(context):
context.log.info("This log message will be visible in the Dagster UI.")
return RunRequest()
```
6 changes: 0 additions & 6 deletions docs/docs-next/docs/concepts/automation/schedules.md
Original file line number Diff line number Diff line change
@@ -1,6 +0,0 @@
---
title: "Schedules"
sidebar_position: 10
---

# Schedules
40 changes: 21 additions & 19 deletions docs/docs-next/docs/guides/automation.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ Dagster offers several ways to automate pipeline execution:
## Schedules

Schedules allow you to run jobs at specified times, like "every Monday at 9 AM" or "daily at midnight."
A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a [cron expression](https://en.wikipedia.org/wiki/Cron)
in order to define when the job should be run.
A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a [cron expression](https://en.wikipedia.org/wiki/Cron)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think jobs needs to be capitalized.

to define when the job should be run.

To make creating cron expressions easier, you can use an online tool like [Crontab Guru](https://crontab.guru/).

Expand All @@ -51,36 +51,38 @@ For more information about how Schedules work, see [About Schedules](/concepts/s

## Sensors

Sensors allow you to trigger runs based on events or conditions, like a new file arriving or an external system status change.
Sensors allow you to trigger runs based on events or conditions that you define, like a new file arriving or an external system status change.

Like schedules, sensors operate on a selection of assets, known as [Jobs](/concepts/ops-jobs) and can either start a pipeline
through a Run or log a reason for not starting a pipeline using a SkipReason.

However, unlike schedules, sensors are triggered by events that you define.
You must provide a function that the sensor will use to determine if it should trigger a run.

### When to use Sensors
Like schedules, sensors operate on a selection of assets, known as [Jobs](/concepts/ops-jobs) and can either start a pipeline through a Run or log a reason for not starting a pipeline using a SkipReason.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Like schedules, sensors operate on a selection of assets, known as [Jobs](/concepts/ops-jobs) and can either start a pipeline through a Run or log a reason for not starting a pipeline using a SkipReason.
Like schedules, sensors operate on a selection of assets, known as [jobs](/concepts/ops-jobs) and can either start a pipeline through a run or log a reason for not starting a pipeline using a `SkipReason`.



### When to use sensors

- You need event-driven automation
- You want to react to changes in external systems

For more examples of how to create sensors, see the [How-To Use Sensors](/guides/automation/sensors) guide.

For more information about how Sensors work, see the [About Sensors](/concepts/sensors) concept page.
For more information about how sensors work, see the [About Sensors](/concepts/sensors) concept page.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For more information about how sensors work, see the [About Sensors](/concepts/sensors) concept page.
For more information about how sensors work, see [About Sensors](/concepts/sensors).


## Asset sensors

Asset Sensors trigger jobs when specified assets are materialized, allowing you to create dependencies between jobs or code locations.

### When to use Asset Sensors
### When to use Asset sensors

- You need to trigger jobs based on asset materializations
- You want to create dependencies between different jobs or code locations

For more examples of how to create asset sensors, see the [How-To Use Asset Sensors](/guides/automation/asset-sensors) guide.

## Declarative automation

TODO: Add content

## Choosing the Right Automation Method
## How to choose the right automation method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can tighten up this title:

Suggested change
## How to choose the right automation method
## Choose an automation method


Consider these factors when selecting an automation method:

Expand All @@ -91,17 +93,17 @@ Consider these factors when selecting an automation method:

Use this table to help guide your decision:

| Method | Best For | Works With |
|--------|----------|------------|
| Schedules | Regular, time-based job runs | Assets, Ops, Graphs |
| Sensors | Event-driven automation | Assets, Ops, Graphs |
| Declarative Automation | Asset-centric, condition-based updates | Assets only |
| Asset Sensors | Cross-job/location asset dependencies | Assets only |
| Method | Best For | Works With |
| ---------------------- | -------------------------------------- | ------------------- |
| Schedules | Regular, time-based job runs | Assets, Ops, Graphs |
| Sensors | Event-driven automation | Assets, Ops, Graphs |
| Declarative Automation | Asset-centric, condition-based updates | Assets only |
| Asset Sensors | Cross-job/location asset dependencies | Assets only |

## Next Steps
## Next steps

- Learn more about [advanced scheduling patterns] - TODO ADD LINK
- Explore [complex sensor examples] - TODO ADD LINK
- Dive into [Declarative Automation best practices] - TODO ADD LINK

By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints.
By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints.
Loading
Loading