Skip to content

Commit

Permalink
[docs-revamp] - Clean up Sensors guide (#24462)
Browse files Browse the repository at this point in the history
## Summary & Motivation

## How I Tested These Changes

## Changelog

NOCHANGELOG

---------

Co-authored-by: Colton Padden <[email protected]>
  • Loading branch information
erinkcochran87 and cmpadden authored Sep 19, 2024
1 parent a4164ca commit a48b3e6
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 14 deletions.
70 changes: 57 additions & 13 deletions docs/docs-beta/docs/guides/sensors.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,77 @@
---
title: Create event-based pipelines with sensors
sidebar_label: Sensors
title: Creating event-based pipelines with sensors
sidebar_label: Event triggers
sidebar_position: 20
---

Sensors are a way to trigger runs in response to events in Dagster. Sensors
run on a regular interval and can either trigger a run, or provide a reason why a run was skipped.

Sensors allow you to respond to events in external systems. For example, you can trigger a run when a new file arrives in an S3 bucket, or when a row is updated in a database.
Sensors enable you to trigger Dagster runs in response to events from external systems. They run at regular intervals, either triggering a run or explaining why a run was skipped. For example, you can trigger a run when a new file is added to an Amazon S3 bucket or when a database row is updated.

<details>
<summary>Prerequisites</summary>

To follow the steps in this guide, you'll need:

- Familiarity with [Assets](/concepts/assets)
- Familiarity with [Ops and Jobs](/concepts/ops-jobs)
</details>

## Basic sensor example

This example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory.
## Basic sensor

The sensor runs every 5 seconds. If it finds new files, it starts a run of `my_job`. If not, it skips the run and logs "No new files found" in the Dagster UI.
Sensors are defined with the `@sensor` decorator. The following example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory.

If the sensor finds new files, it starts a run of `my_job`. If not, it skips the run and logs `No new files found` in the Dagster UI.

<CodeExample filePath="guides/automation/simple-sensor-example.py" language="python" title="Simple Sensor Example" />
<CodeExample filePath="guides/automation/simple-sensor-example.py" language="python" />

:::tip
Unless a sensor has a `default_status` of `DefaultSensorStatus.RUNNING`, it won't be enabled when first deployed to a Dagster instance. To find and enable the sensor, click **Automation > Sensors** in the Dagster UI.
:::

## Customizing intervals between evaluations

The `minimum_interval_seconds` argument allows you to specify the minimum number of seconds that will elapse between sensor evaluations. This means that the sensor won't be evaluated more frequently than the specified interval.

It's important to note that this interval represents a minimum interval between runs of the sensor and not the exact frequency the sensor runs. If a sensor takes longer to complete than the specified interval, the next evaluation will be delayed accordingly.

```python
# Sensor will be evaluated at least every 30 seconds
@dg.sensor(job=my_job, minimum_interval_seconds=30)
def new_file_sensor():
...
```

In this example, if the `new_file_sensor`'s evaluation function takes less than a second to run, you can expect the sensor to run consistently around every 30 seconds. However, if the evaluation function takes longer, the interval between evaluations will be longer.

## Preventing duplicate runs

To prevent duplicate runs, you can use run keys to uniquely identify each `RunRequest`. In the [previous example](#basic-sensor), the `RunRequest` was constructed with a `run_key`:

By default, sensors aren't enabled when first deployed to a Dagster instance.
Click "Automation" in the top navigation to find and enable a sensor.
```
yield dg.RunRequest(run_key=filename)
```

For a given sensor, a single run is created for each `RunRequest` with a unique `run_key`. Dagster will skip processing requests with previously used run keys, ensuring that duplicate runs won't be created.

## Cursors and high volume events

When dealing with a large number of events, you may want to implement a cursor to optimize sensor performance. Unlike run keys, cursors allow you to implement custom logic that manages state.

The following example demonstrates how you might use a cursor to only create `RunRequests` for files in a directory that have been updated since the last time the sensor ran.

<CodeExample filePath="guides/automation/sensor-cursor.py" language="python" />

For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams.

:::note
The preceding example uses both a `run_key` and a cursor, which means that if the cursor is reset but the files don't change, new runs won't be launched. This is because the run keys associated with the files won't change.

If you want to be able to reset a sensor's cursor, don't set `run_key`s on `RunRequest`s.
:::

## Next steps

By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints.

- Run pipelines on a [schedule](/guides/schedules)
- Trigger cross-job dependencies with [asset sensors](/guides/asset-sensors)
- Explore [Declarative Automation](/concepts/automation/declarative-automation) as an alternative to sensors
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os

import dagster as dg

MY_DIRECTORY = "data"


@dg.asset
def my_asset(context: dg.AssetExecutionContext):
context.log.info("Hello, world!")


my_job = dg.define_asset_job("my_job", selection=[my_asset])


@dg.sensor(
job=my_job,
minimum_interval_seconds=5,
default_status=dg.DefaultSensorStatus.RUNNING,
)
# highlight-start
# Enable sensor context
def updated_file_sensor(context):
# Get current cursor value from sensor context
last_mtime = float(context.cursor) if context.cursor else 0
# highlight-end

max_mtime = last_mtime

# Loop through directory
for filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)
if os.path.isfile(filepath):
# Get the file's last modification time (st_mtime)
fstats = os.stat(filepath)
file_mtime = fstats.st_mtime

# If the file was updated since the last eval time, continue
if file_mtime <= last_mtime:
continue

# Construct the RunRequest with run_key and config
run_key = f"{filename}:{file_mtime}"
run_config = {"ops": {"my_asset": {"config": {"filename": filename}}}}
yield dg.RunRequest(run_key=run_key, run_config=run_config)

# highlight-start
# Keep the larger value of max_mtime and file last updated
max_mtime = max(max_mtime, file_mtime)

# Update the cursor
context.update_cursor(str(max_mtime))
# highlight-end


defs = dg.Definitions(assets=[my_asset], jobs=[my_job], sensors=[updated_file_sensor])
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,37 @@
import dagster as dg


# Define the asset
@dg.asset
def my_asset(context: dg.AssetExecutionContext):
context.log.info("Hello, world!")


# Define asset job
my_job = dg.define_asset_job("my_job", selection=[my_asset])


# highlight-start
# Define file check
def check_for_new_files() -> List[str]:
if random.random() > 0.5:
return ["file1", "file2"]
return []


@dg.sensor(job=my_job, minimum_interval_seconds=5)
# Define the sensor
@dg.sensor(
job=my_job,
minimum_interval_seconds=5,
default_status=dg.DefaultSensorStatus.RUNNING, # Sensor is turned on by default
)
def new_file_sensor():
new_files = check_for_new_files()
# New files, run `my_job`
if new_files:
for filename in new_files:
yield dg.RunRequest(run_key=filename)
# No new files, skip the run and log the reason
else:
yield dg.SkipReason("No new files found")
# highlight-end
Expand Down

0 comments on commit a48b3e6

Please sign in to comment.