Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] - Add custom automation conditions guide (DOC-322) #23442

Merged
merged 9 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,16 @@
},
{
"title": "Declarative Automation (Experimental)",
"path": "/concepts/automation/declarative-automation"
"children": [
{
"title": "Overview",
"path": "/concepts/automation/declarative-automation"
},
{
"title": "Customizing automation conditions",
"path": "/concepts/automation/declarative-automation/customizing-automation-conditions"
}
]
},
{
"title": "Asset Sensors",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def my_eager_asset(): ...
AssetSpec("my_cron_asset", automation_condition=AutomationCondition.cron("@daily"))
```

The core <PyObject object="AutomationCondition" /> framework is extremely flexible, allowing you to build custom conditions from the ground up. Refer to the Customizing automation conditions guide for more information.
The core <PyObject object="AutomationCondition" /> framework is extremely flexible, allowing you to build custom conditions from the ground up. Refer to the [Customizing automation conditions guide](/concepts/automation/declarative-automation/customizing-automation-conditions) for more information.

### Sensors

Expand Down Expand Up @@ -184,6 +184,10 @@ From here, you can:
## Related

<ArticleList>
<ArticleListItem
title="Customizing automation conditions"
href="/concepts/automation/declarative-automation/customizing-automation-conditions"
></ArticleListItem>
<ArticleListItem
title="Asset definitions"
href="/concepts/assets/software-defined-assets"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
---
title: "Creating custom Declarative Automation conditions | Dagster Docs"
description: "Learn to create your own custom Declarative Automation conditions."
---

# Creating custom Declarative Automation conditions

<Note>
Declarative Automation is currently <strong>experimental</strong>.
</Note>

[Declarative Automation](/concepts/automation/declarative-automation) includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but you can also customize conditions.

By the end of this guide, you'll understand how <PyObject object="AutomationCondition" pluralize /> work and how to create your own custom conditions.

---

## Prerequisites

Before continuing, you should be familiar with:

- [Asset definitions](/concepts/assets/software-defined-assets)
- [Declarative Automation](/concepts/automation/declarative-automation)

---

## How it works

Each <PyObject object="AutomationCondition" /> consists of a set of **operands** and various **operators**. To create conditions that suit your specific needs, you can combine the operators and operands listed below. For example:

```python
from dagster import AutomationCondition

in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress() | AutomationCondition.failed()
)
```

This condition translates to **Any upstream dependencies (parents) part of an in-progress run or failed during the latest run**.

### Operands

Operands are base conditions which can be true or false about a given asset partition.

| Operand | Description |
| ------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| `AutomationCondition.missing` | Returns true if the asset partition has never been materialized or observed |
| `AutomationCondition.in_progress` | Returns true if the asset partition is part of an in-progress run |
| `AutomationCondition.failed` | Returns true if the asset partition failed to be materialized in its latest run |
| `AutomationCondition.newly_updated` | Returns true if the asset partition was materialized since the previous evaluation |
| `AutomationCondition.newly_requested` | Returns true if the asset partition was requested on the previous evaluation |
| `AutomationCondition.code_version_changed` | Returns true if the asset has a new code version since the previous evaluation |
| `AutomationCondition.cron_tick_passed` | Returns true if a new tick of the provided cron schedule occurred since the previous evaluation |
| `AutomationCondition.in_latest_time_window` | Returns true if the asset partition falls within the latest time window of the asset’s <PyObject object="PartitionsDefinition" />, if applicable. |
| `AutomationCondition.will_be_requested` | Returns true if the asset partition will be requested in this tick |

### Operators

The above conditions can be built into more complex expressions using the following operators:

<table
className="table"
style={{
width: "100%",
}}
>
<thead>
<tr>
<th
style={{
width: "40%",
}}
>
Operator
</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>~</code> (tilde)
</td>
<td>
NOT; condition is not true; ex: <code>~A</code>
</td>
</tr>
<tr>
<td>
<code>|</code> (pipe)
</td>
<td>
OR; either condition must be true; ex: <code>A | B</code>
</td>
</tr>
<tr>
<td>
<code>&</code> (ampersand)
</td>
<td>
AND; both conditions must be true; ex: <code>A & B</code>
</td>
</tr>
<tr>
<td>
<code>A.newly_true()</code>
</td>
<td>False on previous tick and is now true</td>
</tr>
<tr>
<td>
<code>A.since(B)</code>
</td>
<td>
Condition A became true more recently than Condition B. Refer to the{" "}
<a href="#using-statuses-and-events-in-conditions">
Using statuses and events in conditions
</a>{" "}
section for an example.
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.any_deps_match(A)</code>
</td>
<td>
True for any upstream partition. Can be used with <code>.allow()</code>{" "}
and <code>.ignore()</code> to target specific upstream assets. Refer to
the <a href="#targeting-dependencies">Targeting dependencies</a> section
for an example.
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.all_deps_match(A)</code>
</td>
<td>
True for at least one partition of each upstream asset. Can be used with{" "}
<code>.allow()</code> and <code>.ignore()</code> to target specific
upstream assets. Refer to the{" "}
<a href="#targeting-dependencies">Targeting dependencies</a> section for
an example.
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.any_downstream_condition()</code>
</td>
<td>
Any <PyObject object="AutomationCondition" /> on a downstream asset
evaluates to true
</td>
</tr>
</tbody>
</table>

---

## Targeting dependencies

Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the `AutomationCondition.any_deps_match()` operator. This operator takes an arbitrary <PyObject object="AutomationCondition" />, applies it to each upstream asset, and then maps the results to the corresponding downstream partitions.

This operator and `AutomationCondition.all_deps_match()` can be further customized to only target specific sets of upstream assets by using `.allow()` and `.ignore()`.

For example, to target updates from a specific asset group, you can use `any_deps_match` with the `newly_updated` operand and tell it to target only the `metrics` asset group:

```python
from dagster import AssetSelection, AutomationCondition

AutomationCondition.any_deps_match(
AutomationCondition.newly_updated()
).allow(AssetSelection.groups("metrics"))
```

Or to ignore missing partitions from an upstream asset, you can use `any_deps_match` with the `missing` operand and tell it to ignore a specific asset:

```python
AutomationCondition.any_deps_match(
AutomationCondition.missing()
).ignore(AssetSelection.keys("taxi_trips"))
```

---

## Describing conditions with labels

When there are a large number of sub-conditions that make up an <PyObject object="AutomationCondition" />, it can be difficult to understand and troubleshoot the condition. To make conditions easier to understand, you can attach labels to sub-conditions, which will then display in the Dagster UI.

Arbitrary string labels can be attached to any node in the <PyObject object="AutomationCondition" /> tree by using the `with_label()` method, allowing you to describe the purpose of a specific sub-condition. For example:

```python
from dagster import AutomationCondition

in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress() | AutomationCondition.failed()
).with_label("Any parents in progress or failed")
```

Then, when viewing evaluation results in the UI, the label will display next to the condition:

<!-- ![Any parents in progress or failed condition label in the Dagster UI](/images/concepts/automation/declarative-automation/condition-label.png) -->

<Image
alt="Any parents in progress or failed condition label in the Dagster UI"
src="/images/concepts/automation/declarative-automation/condition-label.png"
width={1576}
height={418}
/>

Hovering over or expanding the label will display its sub-conditions:

<!-- ![Expanded Any parents in progress or failed condition label with a list of sub-conditions in the Dagster UI](/images/concepts/automation/declarative-automation/condition-label-expanded.png) -->

<Image
alt="Expanded Any parents in progress or failed condition label with a list of sub-conditions in the Dagster UI"
src="/images/concepts/automation/declarative-automation/condition-label-expanded.png"
width={1576}
height={593}
/>

---

## Using statuses and events in conditions

In some cases, you may want to use statuses and events in your automation conditions:

- **Statuses** are persistent states that are and will be true for some period of time. For example, the `AutomationCondition.missing()` condition will be true only if an asset partition has never been materialized or observed.
- **Events** are transient and reflect something that may only be true for an instant. For example, the `AutomationCondition.newly_updated()` condition will be true only if an asset partition was materialized since the previous evaluation.

Using the `<A>.since(<B>)` operator, you can create conditions that detect if one event has happened more recently than another. Think of this as converting two events to a status - in this case, `A has occurred more recently than B` - as this will stay true for some period of time. This operator becomes true whenever `<A>` is true, and will remain true until `<B>` is also true.

Conversely, it can also be useful to convert statuses to events. For example, the default `eager()` condition ensures that Dagster only tries to materialize a missing asset partition once using the following sub-condition:

```python
from dagster import AutomationCondition

AutomationCondition.missing().newly_true().since(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
)
```

By using the `<A>.newly_true()` operator, you can turn the status of _"being missing"_ into a single event, specifically the point in time where an asset partition entered the _missing_ state. From there, you can ensure that an asset is materialized only once in response to detecting a missing partition.

---

## Using conditions to chain runs

Dagster can group the execution of multiple assets into a single, logical run. For example, imagine you have a series of dependent assets, each with an `AutomationCondition.eager()` condition. When you update the first asset in the chain, the desired behavior is typically to have all downstream assets grouped into a single run, rather than executing each asset in order in individual run.

To create this scenario, you can use `AutomationCondition.will_be_requested()`. Because each <PyObject object="AutomationCondition" /> is evaluated in order, you can query if an upstream asset will be requested on the current tick. For example:

```python
from dagster import AutomationCondition

any_parent_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
)
```

---

## Related

<ArticleList>
<ArticleListItem
title="Asset definitions"
href="/concepts/assets/software-defined-assets"
></ArticleListItem>
<ArticleListItem
title="Declarative Automation"
href="/concepts/automation/declarative-automation"
></ArticleListItem>
<ArticleListItem
title="Automation"
href="/concepts/automation"
></ArticleListItem>
<ArticleListItem
title="Schedules"
href="/concepts/automation/schedules"
></ArticleListItem>
</ArticleList>
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.