Skip to content

Commit

Permalink
Add Automation Page (#23659)
Browse files Browse the repository at this point in the history
This adds a "How to Automate Pipelines" guide. The idea behind this page
is that it serves as the entry point for Automation. We explain the
three ways to automate, each example offers links to a How-To guide with
examples for that concept, and potentially a concept page. Not every
idea needs a concept page. For example, Asset Sensors may only need a
Guide and Reference API documentation.

I did not include the section on Declarative Automation, but it would
also fall here.
  • Loading branch information
PedramNavid authored Aug 16, 2024
1 parent 1718a78 commit 4e1a1c9
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 104 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/build-docs-revamp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
paths:
- docs/docs-next
- .github/workflows/build-docs-revamp.yml

push:
branches:
- docs/revamp
Expand All @@ -13,7 +12,6 @@ on:
- .github/workflows/build-docs-revamp.yml

concurrency:
# Cancel in-progress runs on same branch
group: ${{ github.workflow}}-${{github.ref}}
cancel-in-progress: true

Expand Down
Empty file.
107 changes: 22 additions & 85 deletions docs/docs-next/docs/guides/automation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ last_update:
author: Pedram Navid
---

# How To Automate Pipelines in Dagster

Automation is key to building reliable, efficient data pipelines. This guide covers the main ways to automate processes in Dagster, helping you choose the right method for your needs.
Automation is key to building reliable, efficient data pipelines.
This guide provides a simplified overview of the main ways to automate processes in Dagster,
helping you choose the right method for your needs. You will find links to more detailed guides for each method below.

## What You'll Learn

Expand All @@ -30,105 +30,49 @@ Before continuing, you should be familiar with:

Dagster offers several ways to automate pipeline execution:

1. Schedules - Run jobs at specified times
2. Sensors - Trigger runs based on events
3. Declarative Automation - Automatically materialize assets based on conditions
4. Asset Sensors - Trigger jobs when specific assets materialize
1. [Schedules](#schedules) - Run jobs at specified times
2. [Sensors](#sensors) - Trigger runs based on events
3. [Asset Sensors](#asset-sensors) - Trigger jobs when specific assets materialize

Let's look at each method in more detail.

## Schedules

Schedules allow you to run jobs at specified times, like "every Monday at 9 AM" or "daily at midnight."
A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a cron expression in order to define when the job should be run.
A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a [cron expression](https://en.wikipedia.org/wiki/Cron)
in order to define when the job should be run.

To make creating cron expressions easier, you can use an online tool like [Crontab Guru](https://crontab.guru/).

### When to use Schedules

- You need to run jobs at regular intervals
- You want a basic time-based automation method

### Basic Schedule Example

```python
from dagster import ScheduleDefinition, define_asset_job

# A job is a selection of assets that are grouped together for execution
daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"])
- You want basic time-based automation

# Create a schedule that runs the job daily at midnight
daily_schedule = ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *" # Runs at midnight daily
)
```
For examples of how to create schedules, see the [How-To Use Schedules](/guides/automation/schedules) guide.

View more detailed examples of schedules in the [How-To Use Schedules](/guides/automation/schedules)
and read more about how Schedules work in [About Schedules](/concepts/schedules).
For more information about how Schedules work, see the [About Schedules](/concepts/schedules) concept page.

## Sensors

Sensors allow you to trigger runs based on events or conditions, like a new file arriving or an external system status change.

A sensor requires that you define a function that will
Like schedules, sensors operate on a selection of assets, known as [Jobs](/concepts/ops-jobs) and can either start a pipeline
through a Run or log a reason for not starting a pipeline using a SkipReason.

However, unlike schedules, sensors are triggered by events that you define.
You must provide a function that the sensor will use to determine if it should trigger a run.

### When to use Sensors

- You need event-driven automation
- You want to react to changes in external systems

### Basic Sensor Example

```python
from dagster import RunRequest, SensorDefinition, sensor

@asset
def my_asset():
...

my_job = define_asset_job("my_job", selection=[my_asset])

def check_for_new_files() -> List[str]:
return ["file1", "file2"]
For more examples of how to create sensors, see the [How-To Use Sensors](/guides/automation/sensors) guide.

@sensor(job=my_job)
def new_file_sensor():
new_files = check_for_new_files()
if new_files:
yield RunRequest(run_key=f"filename")
For more information about how Sensors work, see the [About Sensors](/concepts/sensors) concept page.

```

## 3. Declarative Automation

Declarative Automation allows you to automatically materialize assets when specified criteria are met, without needing to define explicit jobs.

### When to use Declarative Automation

- You're working primarily with assets
- You want a simpler, more declarative approach to automation

### Basic Declarative Automation Example

```python
from dagster import asset, AutoMaterializePolicy, AutoMaterializeRule

@asset(
auto_materialize_policy=AutoMaterializePolicy(
rules=[
# Materialize if upstream assets have changed
AutoMaterializeRule.materialize_on_parent_updated(),
# Materialize daily at 2 AM
AutoMaterializeRule.materialize_on_cron("0 2 * * *"),
]
)
)
def my_asset():
# Asset computation logic here
pass
```

## 4. Asset Sensors
## Asset Sensors

Asset Sensors trigger jobs when specified assets are materialized, allowing you to create dependencies between jobs or code locations.

Expand All @@ -137,15 +81,8 @@ Asset Sensors trigger jobs when specified assets are materialized, allowing you
- You need to trigger jobs based on asset materializations
- You want to create dependencies between different jobs or code locations

### Basic Asset Sensor Example

```python
from dagster import AssetSensor, RunRequest, asset_sensor
For more examples of how to create asset sensors, see the [How-To Use Asset Sensors](/guides/automation/asset-sensors) guide.

@asset_sensor(asset_key=["raw_data"], job=process_raw_data_job)
def raw_data_sensor(context):
yield RunRequest(run_key=context.cursor)
```

## Choosing the Right Automation Method

Expand All @@ -171,4 +108,4 @@ Use this table to help guide your decision:
- Explore [complex sensor examples] - TODO ADD LINK
- Dive into [Declarative Automation best practices] - TODO ADD LINK

By understanding and effectively using these automation methods, you can build robust, efficient data pipelines that respond to your specific needs and constraints.
By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints.
10 changes: 10 additions & 0 deletions docs/docs-next/docs/guides/automation/asset-sensors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: Asset Sensors
sidebar_position: 50
---

### Basic Asset Sensor Example

<CodeExample filePath="guides/automation/simple-asset-sensor-example.py" language="python" title="Simple Asset Sensor Example" />

This Asset Sensor will trigger a run of `my_job` whenever the `asset_to_watch` asset is materialized.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Creating dynamic pipelines based on external data"
sidebar_position: 3
sidebar_position: 30
---

# Creating dynamic pipelines based on external data
# Creating dynamic pipelines based on external data

This file was deleted.

74 changes: 74 additions & 0 deletions docs/docs-next/docs/guides/automation/schedules.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
title: "Scheduling pipelines"
sidebar_label: "Running pipelines on a schedule"
sidebar_position: 10
---

## Basic Schedule Example

A basic schedule is defined by a `JobDefinition` and a `cron_schedule` using the `ScheduleDefinition` class.

<CodeExample filePath="guides/automation/simple-schedule-example.py" language="python" title="Simple Schedule Example" />

## How to Set Custom Timezones

By default, schedules without a timezone will run in UTC. If you want to run a schedule in a different timezone, you can
set the `timezone` parameter.

```python
ecommerce_schedule = ScheduleDefinition(
job=ecommerce_job,
cron_schedule="15 5 * * 1-5",
timezone="America/Los_Angeles",
)
```

## How to Create Partitioned Schedules

If you have a partitioned asset and job, you can create a schedule from the partition using `build_schedule_from_partitioned_job`.
The schedule will execute as the same cadence specified by the partition definition.

```python
from dagster import (
asset,
build_schedule_from_partitioned_job,
define_asset_job,
DailyPartitionsDefinition,
)

daily_partition = DailyPartitionsDefinition(start_date="2024-05-20")


@asset(partitions_def=daily_partition)
def daily_asset(): ...

partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset])

# highlight-start
# This partition will run daily
asset_partitioned_schedule = build_schedule_from_partitioned_job(
partitioned_asset_job,
)
# highlight-end

```

If you have a partitioned job, you can create a schedule from the partition using `build_schedule_from_partitioned_job`.

```python
from dagster import build_schedule_from_partitioned_job, job


@job(config=partitioned_config)
def partitioned_op_job(): ...

# highlight-start
partitioned_op_schedule = build_schedule_from_partitioned_job(
partitioned_op_job,
)
# highlight-end
```

---

For more information about how Schedules work, see the [About Schedules](/concepts/schedules) concept page.
18 changes: 18 additions & 0 deletions docs/docs-next/docs/guides/automation/sensors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
title: Sensor Examples
---

### Basic Sensor Example

This example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory.

The sensor runs every 5 seconds. If it finds new files, it starts a run of `my_job`. If not, it skips the run and logs "No new files found" in the Dagster UI.

<CodeExample filePath="guides/automation/simple-sensor-example.py" language="python" title="Simple Sensor Example" />

:::tip

By default, sensors aren't enabled when first deployed to a Dagster instance.
Click "Automation" in the top navigation to find and enable a sensor.

:::
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dagster import (
AssetExecutionContext,
AssetKey,
Definitions,
RunRequest,
asset,
asset_sensor,
define_asset_job,
)


@asset
def asset_to_watch(context: AssetExecutionContext):
context.log.info("Asset to watch")


@asset
def asset_to_trigger(context: AssetExecutionContext):
context.log.info("Asset to trigger")


my_job = define_asset_job("my_job", [asset_to_trigger])


# highlight-start
@asset_sensor(asset_key=AssetKey("asset_to_watch"), job_name="my_job")
def my_asset_sensor():
yield RunRequest()
# highlight-end


defs = Definitions(
assets=[asset_to_watch, asset_to_trigger],
jobs=[my_job],
sensors=[my_asset_sensor],
)
25 changes: 25 additions & 0 deletions docs/docs-next/docs/guides/automation/simple-schedule-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dagster import Definitions, ScheduleDefinition, asset, define_asset_job


@asset
def customer_data(): ...


@asset
def sales_report(): ...


daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"])

# highlight-start
daily_schedule = ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *", # Runs at midnight daily
)
# highlight-end

defs = Definitions(
assets=[customer_data, sales_report],
jobs=[daily_refresh_job],
schedules=[daily_schedule],
)
Loading

2 comments on commit 4e1a1c9

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs-next ready!

✅ Preview
https://dagster-docs-next-cm3uh0xa2-elementl.vercel.app

Built with commit 4e1a1c9.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-i4yl8o1t1-elementl.vercel.app
https://dagster-docs-next.dagster.dagster-docs.io

Built with commit 4e1a1c9.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.