diff --git a/docs/docs-beta/docs/guides/sensors.md b/docs/docs-beta/docs/guides/sensors.md index 17a8030a09290..6792926db0771 100644 --- a/docs/docs-beta/docs/guides/sensors.md +++ b/docs/docs-beta/docs/guides/sensors.md @@ -1,33 +1,77 @@ --- -title: Create event-based pipelines with sensors -sidebar_label: Sensors +title: Creating event-based pipelines with sensors +sidebar_label: Event triggers sidebar_position: 20 --- -Sensors are a way to trigger runs in response to events in Dagster. Sensors -run on a regular interval and can either trigger a run, or provide a reason why a run was skipped. - -Sensors allow you to respond to events in external systems. For example, you can trigger a run when a new file arrives in an S3 bucket, or when a row is updated in a database. +Sensors enable you to trigger Dagster runs in response to events from external systems. They run at regular intervals, either triggering a run or explaining why a run was skipped. For example, you can trigger a run when a new file is added to an Amazon S3 bucket or when a database row is updated.
Prerequisites +To follow the steps in this guide, you'll need: + - Familiarity with [Assets](/concepts/assets) - Familiarity with [Ops and Jobs](/concepts/ops-jobs)
-## Basic sensor example - -This example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory. +## Basic sensor -The sensor runs every 5 seconds. If it finds new files, it starts a run of `my_job`. If not, it skips the run and logs "No new files found" in the Dagster UI. +Sensors are defined with the `@sensor` decorator. The following example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory. +If the sensor finds new files, it starts a run of `my_job`. If not, it skips the run and logs `No new files found` in the Dagster UI. - + :::tip +Unless a sensor has a `default_status` of `DefaultSensorStatus.RUNNING`, it won't be enabled when first deployed to a Dagster instance. To find and enable the sensor, click **Automation > Sensors** in the Dagster UI. +::: + +## Customizing intervals between evaluations + +The `minimum_interval_seconds` argument allows you to specify the minimum number of seconds that will elapse between sensor evaluations. This means that the sensor won't be evaluated more frequently than the specified interval. + +It's important to note that this interval represents a minimum interval between runs of the sensor and not the exact frequency the sensor runs. If a sensor takes longer to complete than the specified interval, the next evaluation will be delayed accordingly. + +```python +# Sensor will be evaluated at least every 30 seconds +@dg.sensor(job=my_job, minimum_interval_seconds=30) +def new_file_sensor(): + ... +``` + +In this example, if the `new_file_sensor`'s evaluation function takes less than a second to run, you can expect the sensor to run consistently around every 30 seconds. However, if the evaluation function takes longer, the interval between evaluations will be longer. + +## Preventing duplicate runs + +To prevent duplicate runs, you can use run keys to uniquely identify each `RunRequest`. In the [previous example](#basic-sensor), the `RunRequest` was constructed with a `run_key`: -By default, sensors aren't enabled when first deployed to a Dagster instance. -Click "Automation" in the top navigation to find and enable a sensor. +``` +yield dg.RunRequest(run_key=filename) +``` +For a given sensor, a single run is created for each `RunRequest` with a unique `run_key`. Dagster will skip processing requests with previously used run keys, ensuring that duplicate runs won't be created. + +## Cursors and high volume events + +When dealing with a large number of events, you may want to implement a cursor to optimize sensor performance. Unlike run keys, cursors allow you to implement custom logic that manages state. + +The following example demonstrates how you might use a cursor to only create `RunRequests` for files in a directory that have been updated since the last time the sensor ran. + + + +For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams. + +:::note +The preceding example uses both a `run_key` and a cursor, which means that if the cursor is reset but the files don't change, new runs won't be launched. This is because the run keys associated with the files won't change. + +If you want to be able to reset a sensor's cursor, don't set `run_key`s on `RunRequest`s. ::: + +## Next steps + +By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints. + +- Run pipelines on a [schedule](/guides/schedules) +- Trigger cross-job dependencies with [asset sensors](/guides/asset-sensors) +- Explore [Declarative Automation](/concepts/automation/declarative-automation) as an alternative to sensors \ No newline at end of file diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/automation/sensor-cursor.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/automation/sensor-cursor.py new file mode 100644 index 0000000000000..f45f741d3098c --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/automation/sensor-cursor.py @@ -0,0 +1,56 @@ +import os + +import dagster as dg + +MY_DIRECTORY = "data" + + +@dg.asset +def my_asset(context: dg.AssetExecutionContext): + context.log.info("Hello, world!") + + +my_job = dg.define_asset_job("my_job", selection=[my_asset]) + + +@dg.sensor( + job=my_job, + minimum_interval_seconds=5, + default_status=dg.DefaultSensorStatus.RUNNING, +) +# highlight-start +# Enable sensor context +def updated_file_sensor(context): + # Get current cursor value from sensor context + last_mtime = float(context.cursor) if context.cursor else 0 + # highlight-end + + max_mtime = last_mtime + + # Loop through directory + for filename in os.listdir(MY_DIRECTORY): + filepath = os.path.join(MY_DIRECTORY, filename) + if os.path.isfile(filepath): + # Get the file's last modification time (st_mtime) + fstats = os.stat(filepath) + file_mtime = fstats.st_mtime + + # If the file was updated since the last eval time, continue + if file_mtime <= last_mtime: + continue + + # Construct the RunRequest with run_key and config + run_key = f"{filename}:{file_mtime}" + run_config = {"ops": {"my_asset": {"config": {"filename": filename}}}} + yield dg.RunRequest(run_key=run_key, run_config=run_config) + + # highlight-start + # Keep the larger value of max_mtime and file last updated + max_mtime = max(max_mtime, file_mtime) + + # Update the cursor + context.update_cursor(str(max_mtime)) + # highlight-end + + +defs = dg.Definitions(assets=[my_asset], jobs=[my_job], sensors=[updated_file_sensor]) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/automation/simple-sensor-example.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/automation/simple-sensor-example.py index 64fbf6ee17702..7e2c4646c43b6 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets/guides/automation/simple-sensor-example.py +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/automation/simple-sensor-example.py @@ -4,27 +4,37 @@ import dagster as dg +# Define the asset @dg.asset def my_asset(context: dg.AssetExecutionContext): context.log.info("Hello, world!") +# Define asset job my_job = dg.define_asset_job("my_job", selection=[my_asset]) # highlight-start +# Define file check def check_for_new_files() -> List[str]: if random.random() > 0.5: return ["file1", "file2"] return [] -@dg.sensor(job=my_job, minimum_interval_seconds=5) +# Define the sensor +@dg.sensor( + job=my_job, + minimum_interval_seconds=5, + default_status=dg.DefaultSensorStatus.RUNNING, # Sensor is turned on by default +) def new_file_sensor(): new_files = check_for_new_files() + # New files, run `my_job` if new_files: for filename in new_files: yield dg.RunRequest(run_key=filename) + # No new files, skip the run and log the reason else: yield dg.SkipReason("No new files found") # highlight-end