From 4e1a1c9931552e4c1d21f4430882d2af590985e9 Mon Sep 17 00:00:00 2001 From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com> Date: Thu, 15 Aug 2024 21:08:39 -0700 Subject: [PATCH] Add Automation Page (#23659) This adds a "How to Automate Pipelines" guide. The idea behind this page is that it serves as the entry point for Automation. We explain the three ways to automate, each example offers links to a How-To guide with examples for that concept, and potentially a concept page. Not every idea needs a concept page. For example, Asset Sensors may only need a Guide and Reference API documentation. I did not include the section on Declarative Automation, but it would also fall here. --- .github/workflows/build-docs-revamp.yml | 2 - docs/docs-next/docs/concepts/sensors.md | 0 docs/docs-next/docs/guides/automation.md | 107 ++++-------------- .../docs/guides/automation/asset-sensors.md | 10 ++ ...ynamic-pipelines-based-on-external-data.md | 4 +- .../running-pipelines-on-a-schedule.md | 6 - .../docs/guides/automation/schedules.md | 74 ++++++++++++ .../docs/guides/automation/sensors.md | 18 +++ .../automation/simple-asset-sensor-example.py | 36 ++++++ .../automation/simple-schedule-example.py | 25 ++++ .../automation/simple-sensor-example.py | 26 ++++- .../triggering-pipeline-runs-using-events.md | 6 +- .../Dagster/section-heading-sentence-case.yml | 5 + .../config/vocabularies/Dagster/accept.txt | 2 + 14 files changed, 217 insertions(+), 104 deletions(-) create mode 100644 docs/docs-next/docs/concepts/sensors.md create mode 100644 docs/docs-next/docs/guides/automation/asset-sensors.md delete mode 100644 docs/docs-next/docs/guides/automation/running-pipelines-on-a-schedule.md create mode 100644 docs/docs-next/docs/guides/automation/sensors.md create mode 100644 docs/docs-next/docs/guides/automation/simple-asset-sensor-example.py create mode 100644 docs/docs-next/docs/guides/automation/simple-schedule-example.py create mode 100644 docs/vale/styles/Dagster/section-heading-sentence-case.yml diff --git a/.github/workflows/build-docs-revamp.yml b/.github/workflows/build-docs-revamp.yml index 7d63950f0ae47..4af9f18c06fe0 100644 --- a/.github/workflows/build-docs-revamp.yml +++ b/.github/workflows/build-docs-revamp.yml @@ -4,7 +4,6 @@ on: paths: - docs/docs-next - .github/workflows/build-docs-revamp.yml - push: branches: - docs/revamp @@ -13,7 +12,6 @@ on: - .github/workflows/build-docs-revamp.yml concurrency: - # Cancel in-progress runs on same branch group: ${{ github.workflow}}-${{github.ref}} cancel-in-progress: true diff --git a/docs/docs-next/docs/concepts/sensors.md b/docs/docs-next/docs/concepts/sensors.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/docs/docs-next/docs/guides/automation.md b/docs/docs-next/docs/guides/automation.md index 099aa8ea65642..43a33ac23c798 100644 --- a/docs/docs-next/docs/guides/automation.md +++ b/docs/docs-next/docs/guides/automation.md @@ -6,9 +6,9 @@ last_update: author: Pedram Navid --- -# How To Automate Pipelines in Dagster - -Automation is key to building reliable, efficient data pipelines. This guide covers the main ways to automate processes in Dagster, helping you choose the right method for your needs. +Automation is key to building reliable, efficient data pipelines. +This guide provides a simplified overview of the main ways to automate processes in Dagster, +helping you choose the right method for your needs. You will find links to more detailed guides for each method below. ## What You'll Learn @@ -30,105 +30,49 @@ Before continuing, you should be familiar with: Dagster offers several ways to automate pipeline execution: -1. Schedules - Run jobs at specified times -2. Sensors - Trigger runs based on events -3. Declarative Automation - Automatically materialize assets based on conditions -4. Asset Sensors - Trigger jobs when specific assets materialize +1. [Schedules](#schedules) - Run jobs at specified times +2. [Sensors](#sensors) - Trigger runs based on events +3. [Asset Sensors](#asset-sensors) - Trigger jobs when specific assets materialize Let's look at each method in more detail. ## Schedules Schedules allow you to run jobs at specified times, like "every Monday at 9 AM" or "daily at midnight." -A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a cron expression in order to define when the job should be run. +A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a [cron expression](https://en.wikipedia.org/wiki/Cron) +in order to define when the job should be run. + To make creating cron expressions easier, you can use an online tool like [Crontab Guru](https://crontab.guru/). ### When to use Schedules - You need to run jobs at regular intervals -- You want a basic time-based automation method - -### Basic Schedule Example - -```python -from dagster import ScheduleDefinition, define_asset_job - -# A job is a selection of assets that are grouped together for execution -daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"]) +- You want basic time-based automation -# Create a schedule that runs the job daily at midnight -daily_schedule = ScheduleDefinition( - job=daily_refresh_job, - cron_schedule="0 0 * * *" # Runs at midnight daily -) -``` +For examples of how to create schedules, see the [How-To Use Schedules](/guides/automation/schedules) guide. -View more detailed examples of schedules in the [How-To Use Schedules](/guides/automation/schedules) -and read more about how Schedules work in [About Schedules](/concepts/schedules). +For more information about how Schedules work, see the [About Schedules](/concepts/schedules) concept page. ## Sensors Sensors allow you to trigger runs based on events or conditions, like a new file arriving or an external system status change. -A sensor requires that you define a function that will +Like schedules, sensors operate on a selection of assets, known as [Jobs](/concepts/ops-jobs) and can either start a pipeline +through a Run or log a reason for not starting a pipeline using a SkipReason. + +However, unlike schedules, sensors are triggered by events that you define. +You must provide a function that the sensor will use to determine if it should trigger a run. ### When to use Sensors - You need event-driven automation - You want to react to changes in external systems -### Basic Sensor Example - -```python -from dagster import RunRequest, SensorDefinition, sensor - -@asset -def my_asset(): - ... - -my_job = define_asset_job("my_job", selection=[my_asset]) - -def check_for_new_files() -> List[str]: - return ["file1", "file2"] +For more examples of how to create sensors, see the [How-To Use Sensors](/guides/automation/sensors) guide. -@sensor(job=my_job) -def new_file_sensor(): - new_files = check_for_new_files() - if new_files: - yield RunRequest(run_key=f"filename") +For more information about how Sensors work, see the [About Sensors](/concepts/sensors) concept page. -``` - -## 3. Declarative Automation - -Declarative Automation allows you to automatically materialize assets when specified criteria are met, without needing to define explicit jobs. - -### When to use Declarative Automation - -- You're working primarily with assets -- You want a simpler, more declarative approach to automation - -### Basic Declarative Automation Example - -```python -from dagster import asset, AutoMaterializePolicy, AutoMaterializeRule - -@asset( - auto_materialize_policy=AutoMaterializePolicy( - rules=[ - # Materialize if upstream assets have changed - AutoMaterializeRule.materialize_on_parent_updated(), - # Materialize daily at 2 AM - AutoMaterializeRule.materialize_on_cron("0 2 * * *"), - ] - ) -) -def my_asset(): - # Asset computation logic here - pass -``` - -## 4. Asset Sensors +## Asset Sensors Asset Sensors trigger jobs when specified assets are materialized, allowing you to create dependencies between jobs or code locations. @@ -137,15 +81,8 @@ Asset Sensors trigger jobs when specified assets are materialized, allowing you - You need to trigger jobs based on asset materializations - You want to create dependencies between different jobs or code locations -### Basic Asset Sensor Example - -```python -from dagster import AssetSensor, RunRequest, asset_sensor +For more examples of how to create asset sensors, see the [How-To Use Asset Sensors](/guides/automation/asset-sensors) guide. -@asset_sensor(asset_key=["raw_data"], job=process_raw_data_job) -def raw_data_sensor(context): - yield RunRequest(run_key=context.cursor) -``` ## Choosing the Right Automation Method @@ -171,4 +108,4 @@ Use this table to help guide your decision: - Explore [complex sensor examples] - TODO ADD LINK - Dive into [Declarative Automation best practices] - TODO ADD LINK -By understanding and effectively using these automation methods, you can build robust, efficient data pipelines that respond to your specific needs and constraints. \ No newline at end of file +By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints. \ No newline at end of file diff --git a/docs/docs-next/docs/guides/automation/asset-sensors.md b/docs/docs-next/docs/guides/automation/asset-sensors.md new file mode 100644 index 0000000000000..1d928c3cc59bc --- /dev/null +++ b/docs/docs-next/docs/guides/automation/asset-sensors.md @@ -0,0 +1,10 @@ +--- +title: Asset Sensors +sidebar_position: 50 +--- + +### Basic Asset Sensor Example + + + +This Asset Sensor will trigger a run of `my_job` whenever the `asset_to_watch` asset is materialized. diff --git a/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md b/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md index 6aa876233cae2..98576d9d32794 100644 --- a/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md +++ b/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md @@ -1,6 +1,6 @@ --- title: "Creating dynamic pipelines based on external data" -sidebar_position: 3 +sidebar_position: 30 --- -# Creating dynamic pipelines based on external data \ No newline at end of file +# Creating dynamic pipelines based on external data diff --git a/docs/docs-next/docs/guides/automation/running-pipelines-on-a-schedule.md b/docs/docs-next/docs/guides/automation/running-pipelines-on-a-schedule.md deleted file mode 100644 index c99994d5c9a78..0000000000000 --- a/docs/docs-next/docs/guides/automation/running-pipelines-on-a-schedule.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -title: "Scheduling runs" -sidebar_position: 1 ---- - -# Scheduling runs \ No newline at end of file diff --git a/docs/docs-next/docs/guides/automation/schedules.md b/docs/docs-next/docs/guides/automation/schedules.md index e69de29bb2d1d..db5105529916e 100644 --- a/docs/docs-next/docs/guides/automation/schedules.md +++ b/docs/docs-next/docs/guides/automation/schedules.md @@ -0,0 +1,74 @@ +--- +title: "Scheduling pipelines" +sidebar_label: "Running pipelines on a schedule" +sidebar_position: 10 +--- + +## Basic Schedule Example + +A basic schedule is defined by a `JobDefinition` and a `cron_schedule` using the `ScheduleDefinition` class. + + + +## How to Set Custom Timezones + +By default, schedules without a timezone will run in UTC. If you want to run a schedule in a different timezone, you can +set the `timezone` parameter. + +```python +ecommerce_schedule = ScheduleDefinition( + job=ecommerce_job, + cron_schedule="15 5 * * 1-5", +timezone="America/Los_Angeles", +) +``` + +## How to Create Partitioned Schedules + +If you have a partitioned asset and job, you can create a schedule from the partition using `build_schedule_from_partitioned_job`. +The schedule will execute as the same cadence specified by the partition definition. + +```python +from dagster import ( + asset, + build_schedule_from_partitioned_job, + define_asset_job, + DailyPartitionsDefinition, +) + +daily_partition = DailyPartitionsDefinition(start_date="2024-05-20") + + +@asset(partitions_def=daily_partition) +def daily_asset(): ... + +partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset]) + +# highlight-start +# This partition will run daily +asset_partitioned_schedule = build_schedule_from_partitioned_job( + partitioned_asset_job, +) +# highlight-end + +``` + +If you have a partitioned job, you can create a schedule from the partition using `build_schedule_from_partitioned_job`. + +```python +from dagster import build_schedule_from_partitioned_job, job + + +@job(config=partitioned_config) +def partitioned_op_job(): ... + +# highlight-start +partitioned_op_schedule = build_schedule_from_partitioned_job( + partitioned_op_job, +) +# highlight-end +``` + +--- + +For more information about how Schedules work, see the [About Schedules](/concepts/schedules) concept page. diff --git a/docs/docs-next/docs/guides/automation/sensors.md b/docs/docs-next/docs/guides/automation/sensors.md new file mode 100644 index 0000000000000..fcaa64aa5be7c --- /dev/null +++ b/docs/docs-next/docs/guides/automation/sensors.md @@ -0,0 +1,18 @@ +--- +title: Sensor Examples +--- + +### Basic Sensor Example + +This example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory. + +The sensor runs every 5 seconds. If it finds new files, it starts a run of `my_job`. If not, it skips the run and logs "No new files found" in the Dagster UI. + + + +:::tip + +By default, sensors aren't enabled when first deployed to a Dagster instance. +Click "Automation" in the top navigation to find and enable a sensor. + +::: diff --git a/docs/docs-next/docs/guides/automation/simple-asset-sensor-example.py b/docs/docs-next/docs/guides/automation/simple-asset-sensor-example.py new file mode 100644 index 0000000000000..3432bb604d190 --- /dev/null +++ b/docs/docs-next/docs/guides/automation/simple-asset-sensor-example.py @@ -0,0 +1,36 @@ +from dagster import ( + AssetExecutionContext, + AssetKey, + Definitions, + RunRequest, + asset, + asset_sensor, + define_asset_job, +) + + +@asset +def asset_to_watch(context: AssetExecutionContext): + context.log.info("Asset to watch") + + +@asset +def asset_to_trigger(context: AssetExecutionContext): + context.log.info("Asset to trigger") + + +my_job = define_asset_job("my_job", [asset_to_trigger]) + + +# highlight-start +@asset_sensor(asset_key=AssetKey("asset_to_watch"), job_name="my_job") +def my_asset_sensor(): + yield RunRequest() + # highlight-end + + +defs = Definitions( + assets=[asset_to_watch, asset_to_trigger], + jobs=[my_job], + sensors=[my_asset_sensor], +) diff --git a/docs/docs-next/docs/guides/automation/simple-schedule-example.py b/docs/docs-next/docs/guides/automation/simple-schedule-example.py new file mode 100644 index 0000000000000..33ec3e6b791e4 --- /dev/null +++ b/docs/docs-next/docs/guides/automation/simple-schedule-example.py @@ -0,0 +1,25 @@ +from dagster import Definitions, ScheduleDefinition, asset, define_asset_job + + +@asset +def customer_data(): ... + + +@asset +def sales_report(): ... + + +daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"]) + +# highlight-start +daily_schedule = ScheduleDefinition( + job=daily_refresh_job, + cron_schedule="0 0 * * *", # Runs at midnight daily +) +# highlight-end + +defs = Definitions( + assets=[customer_data, sales_report], + jobs=[daily_refresh_job], + schedules=[daily_schedule], +) diff --git a/docs/docs-next/docs/guides/automation/simple-sensor-example.py b/docs/docs-next/docs/guides/automation/simple-sensor-example.py index 1f7210d32984c..e6011a6d4ecea 100644 --- a/docs/docs-next/docs/guides/automation/simple-sensor-example.py +++ b/docs/docs-next/docs/guides/automation/simple-sensor-example.py @@ -1,6 +1,15 @@ +import random from typing import List -from dagster import AssetExecutionContext, Definitions, RunRequest, asset, define_asset_job, sensor +from dagster import ( + AssetExecutionContext, + Definitions, + RunRequest, + SkipReason, + asset, + define_asset_job, + sensor, +) @asset @@ -11,15 +20,22 @@ def my_asset(context: AssetExecutionContext): my_job = define_asset_job("my_job", selection=[my_asset]) +# highlight-start def check_for_new_files() -> List[str]: - return ["file1", "file2"] + if random.random() > 0.5: + return ["file1", "file2"] + return [] -@sensor(target=my_job) +@sensor(target=my_job, minimum_interval_seconds=5) def new_file_sensor(): new_files = check_for_new_files() - for filename in new_files: - yield RunRequest(run_key=filename) + if new_files: + for filename in new_files: + yield RunRequest(run_key=filename) + else: + yield SkipReason("No new files found") + # highlight-end defs = Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor]) diff --git a/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md b/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md index a22694860e3fa..17826f1c536e4 100644 --- a/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md +++ b/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md @@ -1,6 +1,4 @@ --- -title: "Triggering runs using events" -sidebar_position: 2 +title: "Creating event-based pipelines" +sidebar_position: 20 --- - -# Triggering pipeline runs using events \ No newline at end of file diff --git a/docs/vale/styles/Dagster/section-heading-sentence-case.yml b/docs/vale/styles/Dagster/section-heading-sentence-case.yml new file mode 100644 index 0000000000000..93244ad05e7c6 --- /dev/null +++ b/docs/vale/styles/Dagster/section-heading-sentence-case.yml @@ -0,0 +1,5 @@ +extends: capitalization +message: "'%s' should be in sentence case" +level: error +scope: heading +match: $sentence diff --git a/docs/vale/styles/config/vocabularies/Dagster/accept.txt b/docs/vale/styles/config/vocabularies/Dagster/accept.txt index 9ee817464ef1a..ca940b3243c3c 100644 --- a/docs/vale/styles/config/vocabularies/Dagster/accept.txt +++ b/docs/vale/styles/config/vocabularies/Dagster/accept.txt @@ -11,3 +11,5 @@ dataframe dataframes DataFrame cron +materializations +webserver \ No newline at end of file