Skip to content

Commit

Permalink
Add automation guide page and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
PedramNavid committed Aug 14, 2024
1 parent 0285086 commit b0d2288
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 79 deletions.
103 changes: 29 additions & 74 deletions docs/docs-next/docs/guides/automation.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ last_update:

# How To Automate Pipelines in Dagster

Automation is key to building reliable, efficient data pipelines. This guide covers the main ways to automate processes in Dagster, helping you choose the right method for your needs.
Automation is key to building reliable, efficient data pipelines.
This guide provides a simplified overview of the main ways to automate processes in Dagster,
helping you choose the right method for your needs. You will find links to more detailed guides for each method below.

## What You'll Learn

Expand All @@ -27,53 +29,46 @@ Before continuing, you should be familiar with:
</details>




## Automation Methods Overview

Dagster offers several ways to automate pipeline execution:

1. Schedules - Run jobs at specified times
2. Sensors - Trigger runs based on events
3. Declarative Automation - Automatically materialize assets based on conditions
4. Asset Sensors - Trigger jobs when specific assets materialize
1. [Schedules](#schedules) - Run jobs at specified times
2. [Sensors](#sensors) - Trigger runs based on events
3. [Asset Sensors](#asset-sensors) - Trigger jobs when specific assets materialize

Let's look at each method in more detail.

## Schedules

Schedules allow you to run jobs at specified times, like "every Monday at 9 AM" or "daily at midnight."
A schedule combines a selection of assets, known as a [Job](/concepts/jobs), and a cron expression in order to define when the job should be run.
A schedule combines a selection of assets, known as a [Job](/concepts/jobs), and a [cron expression](https://en.wikipedia.org/wiki/Cron)
in order to define when the job should be run.

To make creating cron expressions easier, you can use an online tool like [Crontab Guru](https://crontab.guru/).

### When to use Schedules

- You need to run jobs at regular intervals
- You want a basic time-based automation method
- You want basic time-based automation

### Basic Schedule Example

```python
from dagster import ScheduleDefinition, define_asset_job
<CodeExample filePath="guides/automation/simple-schedule-example.py" language="python" title="Simple Schedule Example" />

# A job is a selection of assets that are grouped together for execution
daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"])
For more examples of schedules, see the [How-To Use Schedules](/guides/automation/schedules) guide.

# Create a schedule that runs the job daily at midnight
daily_schedule = ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *" # Runs at midnight daily
)
```

View more detailed examples of schedules in the [How-To Use Schedules](/guides/automation/schedules)
and read more about how Schedules work in [About Schedules](/concepts/schedules).
For more information about how Schedules work, see the [About Schedules](/concepts/schedules) concept page.

## Sensors

Sensors allow you to trigger runs based on events or conditions, like a new file arriving or an external system status change.

A sensor requires that you define a function that will
Like schedules, sensors operate on a selection of assets, known as [Jobs](/concepts/jobs) and can either start a pipeline
through a [Run](/concepts/runs) or log a reason for not starting a pipeline using a [SkipReason](/concepts/sensors#skip-reasons).

However, unlike schedules, sensors are triggered by events that you define.
You must provide a function that the sensor will use to determine if it should trigger a run.

### When to use Sensors

Expand All @@ -82,56 +77,20 @@ A sensor requires that you define a function that will

### Basic Sensor Example

```python
from dagster import RunRequest, SensorDefinition, sensor

@asset
def my_asset():
...

my_job = define_asset_job("my_job", selection=[my_asset])

def check_for_new_files() -> List[str]:
return ["file1", "file2"]

@sensor(job=my_job)
def new_file_sensor():
new_files = check_for_new_files()
if new_files:
yield RunRequest(run_key=f"filename")

```

## 3. Declarative Automation

Declarative Automation allows you to automatically materialize assets when specified criteria are met, without needing to define explicit jobs.
This example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory.

### When to use Declarative Automation
The sensor runs every 5 seconds. If it finds new files, it starts a run of `my_job`. If not, it skips the run and logs "No new files found" in the Dagster UI.

- You're working primarily with assets
- You want a simpler, more declarative approach to automation
<CodeExample filePath="guides/automation/simple-sensor-example.py" language="python" title="Simple Sensor Example" />

### Basic Declarative Automation Example
:::tip

```python
from dagster import asset, AutoMaterializePolicy, AutoMaterializeRule
By default, sensors aren't enabled when first deployed to a Dagster instance.
Click "Automation" in the top navigation to find and enable a sensor.

@asset(
auto_materialize_policy=AutoMaterializePolicy(
rules=[
# Materialize if upstream assets have changed
AutoMaterializeRule.materialize_on_parent_updated(),
# Materialize daily at 2 AM
AutoMaterializeRule.materialize_on_cron("0 2 * * *"),
]
)
)
def my_asset():
# Asset computation logic here
pass
```
:::

## 4. Asset Sensors
## Asset Sensors

Asset Sensors trigger jobs when specified assets are materialized, allowing you to create dependencies between jobs or code locations.

Expand All @@ -142,13 +101,9 @@ Asset Sensors trigger jobs when specified assets are materialized, allowing you

### Basic Asset Sensor Example

```python
from dagster import AssetSensor, RunRequest, asset_sensor
<CodeExample filePath="guides/automation/simple-asset-sensor-example.py" language="python" title="Simple Asset Sensor Example" />

@asset_sensor(asset_key=["raw_data"], job=process_raw_data_job)
def raw_data_sensor(context):
yield RunRequest(run_key=context.cursor)
```
This Asset Sensor will trigger a run of `my_job` whenever the `asset_to_watch` asset is materialized.

## Choosing the Right Automation Method

Expand All @@ -174,4 +129,4 @@ Use this table to help guide your decision:
- Explore [complex sensor examples](link-to-sensor-examples)
- Dive into [Declarative Automation best practices](link-to-declarative-automation)

By understanding and effectively using these automation methods, you can build robust, efficient data pipelines that respond to your specific needs and constraints.
By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dagster import (
AssetExecutionContext,
AssetKey,
Definitions,
RunRequest,
asset,
asset_sensor,
define_asset_job,
)


@asset
def asset_to_watch(context: AssetExecutionContext):
context.log.info("Asset to watch")


@asset
def asset_to_trigger(context: AssetExecutionContext):
context.log.info("Asset to trigger")


my_job = define_asset_job("my_job", [asset_to_trigger])


# highlight-start
@asset_sensor(asset_key=AssetKey("asset_to_watch"), job_name="my_job")
def my_asset_sensor():
yield RunRequest()
# highlight-end


defs = Definitions(
assets=[asset_to_watch, asset_to_trigger],
jobs=[my_job],
sensors=[my_asset_sensor],
)
25 changes: 25 additions & 0 deletions docs/docs-next/docs/guides/automation/simple-schedule-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dagster import Definitions, ScheduleDefinition, asset, define_asset_job


@asset
def customer_data(): ...


@asset
def sales_report(): ...


daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"])

# highlight-start
daily_schedule = ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *", # Runs at midnight daily
)
# highlight-end

defs = Definitions(
assets=[customer_data, sales_report],
jobs=[daily_refresh_job],
schedules=[daily_schedule],
)
26 changes: 21 additions & 5 deletions docs/docs-next/docs/guides/automation/simple-sensor-example.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import random
from typing import List

from dagster import AssetExecutionContext, Definitions, RunRequest, asset, define_asset_job, sensor
from dagster import (
AssetExecutionContext,
Definitions,
RunRequest,
SkipReason,
asset,
define_asset_job,
sensor,
)


@asset
Expand All @@ -11,15 +20,22 @@ def my_asset(context: AssetExecutionContext):
my_job = define_asset_job("my_job", selection=[my_asset])


# highlight-start
def check_for_new_files() -> List[str]:
return ["file1", "file2"]
if random.random() > 0.5:
return ["file1", "file2"]
return []


@sensor(target=my_job)
@sensor(target=my_job, minimum_interval_seconds=5)
def new_file_sensor():
new_files = check_for_new_files()
for filename in new_files:
yield RunRequest(run_key=filename)
if new_files:
for filename in new_files:
yield RunRequest(run_key=filename)
else:
yield SkipReason("No new files found")
# highlight-end


defs = Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor])
Expand Down

0 comments on commit b0d2288

Please sign in to comment.