Skip to content

Commit

Permalink
[doc-revamp] - Clean up partitioning guide (#24399)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR cleans up the Partitioning guide and splits it into two pages:
one for partitioning assets, one for defining dependencies.

@yuhan I know parts of this are still in the works - this PR is mostly
for cleanup in the definition portion, and an RFC for the page split.

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
erinkcochran87 authored Sep 11, 2024
1 parent 0f942c4 commit 53ae8c5
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 98 deletions.
75 changes: 75 additions & 0 deletions docs/docs-beta/docs/guides/partition-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
title: Defining dependencies between partitioned assets
description: Learn how to define dependencies between partitioned and unpartitioned assets in Dagster.
sidebar_label: Partition dependencies
sidebar_position: 31
---

Now that you've seen how to model partitioned assets in different ways, you may want to define dependencies between the partitioned assets, or even between unpartitioned assets.

Partitioned assets in Dagster can have dependencies on other partitioned assets, allowing you to create complex data pipelines where the output of one partitioned asset feeds into another. Here's how it works:

- A downstream asset can depend on one or more partitions of an upstream asset
- The partitioning schemes don't need to be identical, but they should be compatible

---

<details>
<summary>Prerequisites</summary>

To follow the steps in this guide, you'll need:

- Familiarity with [Assets](/guides/data-assets)
- Familiarity with [Partitions](/guides/partitioning)

</details>

## Dependencies between different time-based partitions \{#different-time-dependencies}

The [time-partitioned asset example](#time-based) created two partitions: `daily_sales_data` and `daily_sales_summary`, which can be executed at the same time in a single schedule.

However, sometimes you might want to define dependencies between different time-based partitions. For example, you might want to aggregate daily data into a weekly report.

Consider the following example:

<CodeExample filePath="guides/data-modeling/partitioning/time_based_partition_dependencies.py" language="python" />

In this example:

- We have a `daily_sales_data` asset partitioned by day, which will be executed daily.
- The `weekly_sales_summary` asset depends on the `daily_sales_data` asset, which will be executed weekly.

- In this asset, the weekly partition depends on all its parent partitions (all seven days of the week). We use `context.asset_partition_key_range_for_input("daily_sales_data")` to get a range of partition keys, which includes the start and end of the week.

- To automate the execution of these assets:

- First, we specify `automation_condition=AutomationCondition.eager()` to the `weekly_sales_summary` asset. This ensures it runs weekly after all seven daily partitions of `daily_sales_data` are up-to-date.
- Second, we specify `automation_condition=AutomationCondition.cron(cron_schedule="0 1 * * *")` to the `daily_sales_data` asset. This ensures it runs daily.

Note: In [a simpler example](#define-time-partitioned-assets) above, we manually set up a daily schedule for asset execution. For more complex dependency logic, it's recommended to use automation conditions instead of schedules. Automation conditions specify when an asset should run, which allows you to define execution criteria without custom scheduling logic. For more details, see [Declarative Automation](/concepts/automation/declarative-automation).

## Dependencies between time-based partitions and un-partitioned assets

TODO

## Dependencies between time-based and static partitions

Combining time-based and static partitions allows you to analyze data across both temporal and categorical dimensions. This is particularly useful for scenarios like regional time series analysis.

{/* TODO */}

## Dependencies between time-based and dynamic partitions

{/* TODO */}

## Dependencies between time-based partitions and un-partitioned assets

{/* TODO */}

## Integrating Dagster partitions with external systems: incremental models and dbt

{/* TODO */}

## Next steps

- Go deeper into [Understanding Partitioning](#)
121 changes: 28 additions & 93 deletions docs/docs-beta/docs/guides/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ sidebar_position: 30

In Dagster, partitioning is a powerful technique for managing large datasets, improving pipeline performance, and enabling incremental processing. This guide will help you understand how to implement data partitioning in your Dagster projects.

There are several ways to partition your data in Dagster:

- [Time-based partitioning](#time-based-based), for processing data in specific time intervals
- [Static partitioning](#static-partitions), for dividing data based on predefined categories
- [Two-dimensional partitioning](#two-dimensional-partitions), for partitioning data along two different axes simultaneously
- [Dynamic partitioning](#dynamic-partitions), for creating partitions based on runtime information

## What you'll learn

- How to define partitions for Dagster assets and jobs
Expand All @@ -20,124 +27,52 @@ In Dagster, partitioning is a powerful technique for managing large datasets, im

To follow the steps in this guide, you'll need:

- A basic understanding of Dagster and assets. See the [Quick Start](/getting-started/quickstart) tutorial for an overview.
- Familiarity with [Assets](/guides/data-assets)

</details>

## Define partitioned assets

There are several ways to partition your data in Dagster:

- [Time-based partitioning](#define-time-partitioned-assets), for processing data in specific time intervals
- [Static partitioning](#define-partitions-with-predefined-categories), for dividing data based on predefined categories
- [Two-dimensional partitioning](#define-two-dimensional-partitions), for partitioning data along two different axes simultaneously
- [Dynamic partitioning](#define-partitions-with-dynamic-categories), for creating partitions based on runtime information

### Define time-partitioned assets

A common use case for partitioning is to process data that can be divided into time intervals, such as daily logs or monthly reports. Here's how to implement time-based partitioning in Dagster:
## Time-based partitions \{#time-based}

<CodeExample filePath="guides/data-modeling/partitioning/time_based_partitioning.py" language="python" title="Time-based partitioning" />

In this example:
A common use case for partitioning is to process data that can be divided into time intervals, such as daily logs or monthly reports.

- We defined `daily_partitions` using `DailyPartitionsDefinition` with a start date of "2024-01-01". This will create a range of partitions from "2024-01-01" to the day before the current time.
- The `daily_sales_data` asset is defined with this partitioning scheme.
- The `daily_sales_summary` asset depends on `daily_sales_data` and also uses the same partitioning scheme.
- The schedule `daily_sales_schedule` runs the job daily at 1:00 AM UTC and processes the previous day's data.
<CodeExample filePath="guides/data-modeling/partitioning/time_based_partitioning.py" language="python" />

### Define partitions with predefined categories
## Partitions with predefined categories \{#static-partitions}

Sometimes you have a set of predefined categories for your data. For instance, you might want to process data separately for different regions.

<CodeExample filePath="guides/data-modeling/partitioning/static_partitioning.py" language="python" title="Static partitioning" />

In this example:

- We defined `region_partitions` using `StaticPartitionsDefinition` with a list of regions.
- The `regional_sales_data` and `daily_sales_summary` are defined with the same partitioning scheme.
<CodeExample filePath="guides/data-modeling/partitioning/static_partitioning.py" language="python" />

{/* TODO: Link to Backfill page to explain how to backfill regional sales data */}

### Define two-dimensional partitions

Two-dimensional partitioning allows you to partition your data along two different axes simultaneously. This is useful when you need to process data that can be categorized in multiple ways. Here's an example of how to implement two-dimensional partitioning in Dagster:

<CodeExample filePath="guides/data-modeling/partitioning/two_dimensional_partitioning.py" language="python" title="Two-dimensional partitioning" />
## Two-dimensional partitions \{#two-dimensional-partitions}

In this example:

- We defined `two_dimensional_partitions` using `MultiPartitionsDefinition` with two dimensions: `date` and `region`.
- The partition key would be: `2024-08-01|us`.
- The `daily_regional_sales_data` and `daily_regional_sales_summary` assets are defined with the same two-dimensional partitioning scheme.
- The `daily_regional_sales_schedule` runs daily at 1:00 AM, processing the previous day's data for all regions. It uses `MultiPartitionKey` to specify partition keys for both date and region dimensions, resulting in 3 runs per day (one for each region).

### Define partitions with dynamic categories
Two-dimensional partitioning allows you to partition data along two different axes simultaneously. This is useful when you need to process data that can be categorized in multiple ways. For example:

Sometimes you don't know the partitions in advance. For example, you might want to process regions that are added in your system. In such cases, you can use dynamic partitioning to create partitions based on runtime information.

<CodeExample filePath="guides/data-modeling/partitioning/dynamic_partitioning.py" language="python" title="Dynamic partitioning" />
<CodeExample filePath="guides/data-modeling/partitioning/two_dimensional_partitioning.py" language="python" />

In this example:

- We defined `region_partitions` using `DynamicPartitionsDefinition` without knowing the values in advance.
- The `all_regions_sensor` is a sensor that will dynamically add all regions to the partition set. Once it kicks off runs, it will dynamically kick off runs for all regions (in this example, 6 times; one for each region).
- Using `MultiPartitionsDefinition`, the `two_dimensional_partitions` is defined with two dimensions: `date` and `region`
- The partition key would be: `2024-08-01|us`
- The `daily_regional_sales_data` and `daily_regional_sales_summary` assets are defined with the same two-dimensional partitioning scheme
- The `daily_regional_sales_schedule` runs daily at 1:00 AM, processing the previous day's data for all regions. It uses `MultiPartitionKey` to specify partition keys for both date and region dimensions, resulting in three runs per day, one for each region.

## Define dependencies between partitioned assets
## Partitions with dynamic categories \{#dynamic-partitions}

Now that you've seen how to model partitioned assets in different ways, this section shows how to define dependencies between various partitioned assets, and between partitioned assets and un-partitioned assets.
Sometimes you don't know the partitions in advance. For example, you might want to process new regions that are added in your system. In these cases, you can use dynamic partitioning to create partitions based on runtime information.

Partitioned assets in Dagster can have dependencies on other partitioned assets. This allows you to create complex data pipelines where the output of one partitioned asset feeds into another. Here's how it works:
Consider this example:

- A downstream asset can depend on one or more partitions of an upstream asset
- The partitioning schemes don't need to be identical, but they should be compatible

### Dependencies between different time-based partitions

In [Define time-partitioned assets](#define-time-partitioned-assets), we created two time-partitioned assets: `daily_sales_data` and `daily_sales_summary`, which can be executed at the same time in a single schedule.

However, sometimes you might want to define dependencies between different time-based partitions. For example, you might want to aggregate daily data into a weekly report.

In this case, we have a `weekly_sales_summary` asset that depends on the `daily_sales_data` asset. Here's how to set up dependencies between different time-based partitions:
<CodeExample filePath="guides/data-modeling/partitioning/dynamic_partitioning.py" language="python" title="Dynamic partitioning" />

<CodeExample filePath="guides/data-modeling/partitioning/time_based_partition_dependencies.py" language="python" title="Time-based partition dependencies" />
Because the partition values are unknown in advance, `DynamicPartitionsDefinition` is used to define the partition. Then, the `all_regions_sensor`

In this example:

- We have a `daily_sales_data` asset partitioned by day, which will be executed daily.
- The `weekly_sales_summary` asset depends on the `daily_sales_data` asset, which will be executed weekly.

- In this asset, the weekly partition depends on all its parent partitions (all seven days of the week). We use `context.asset_partition_key_range_for_input("daily_sales_data")` to get a range of partition keys, which includes the start and end of the week.

- To automate the execution of these assets:

- First, we specify `automation_condition=AutomationCondition.eager()` to the `weekly_sales_summary` asset. This ensures it runs weekly after all seven daily partitions of `daily_sales_data` are up-to-date.
- Second, we specify `automation_condition=AutomationCondition.cron(cron_schedule="0 1 * * *")` to the `daily_sales_data` asset. This ensures it runs daily.

Note: In [a simpler example](#define-time-partitioned-assets) above, we manually set up a daily schedule for asset execution. For more complex dependency logic, it's recommended to use automation conditions instead of schedules. Automation conditions specify when an asset should run, which allows you to define execution criteria without custom scheduling logic. For more details, see [Declarative Automation](/concepts/automation/declarative-automation).

### Dependencies between time-based partitions and un-partitioned assets

TODO

### Dependencies between time-based and static partitions

Combining time-based and static partitions allows you to analyze data across both temporal and categorical dimensions. This is particularly useful for scenarios like regional time series analysis.

{/* TODO */}

### Dependencies between time-based and dynamic partitions

{/* TODO */}

### Dependencies between time-based partitions and un-partitioned assets

{/* TODO */}

## Integrating Dagster partitions with external systems: incremental models and dbt

{/* TODO */}
- Because the partition values are unknown in advance, `DynamicPartitionsDefinition` is used to define `region_partitions`
- When triggered, the `all_regions_sensor` will dynamically add all regions to the partition set. Once it kicks off runs, it will dynamically kick off runs for all regions. In this example, that would be six times; one for each region.

## Next steps

- Go deeper into [Understanding Partitioning](#)
- TODOD: Partition dependencies
1 change: 1 addition & 0 deletions docs/docs-beta/sidebars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const sidebars: SidebarsConfig = {
'guides/data-assets',
'guides/metadata',
'guides/partitioning',
'guides/partition-dependencies',
'guides/external-assets',
],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


# Define the partitioned asset
@dg.asset(partitions_def=region_partitions)
@dg.asset(partitions_def=region_partitions) # Use the region partitioning scheme
def regional_sales_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_key

Expand All @@ -29,7 +29,7 @@ def regional_sales_data(context: dg.AssetExecutionContext) -> None:


@dg.asset(
partitions_def=region_partitions,
partitions_def=region_partitions, # Use the region partitioning scheme
deps=[regional_sales_data],
)
def daily_sales_summary(context):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import dagster as dg

# Create the PartitionDefinition
# Create the PartitionDefinition,
# which will create a range of partitions from
# 2024-01-01 to the day before the current time
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")


Expand All @@ -29,8 +31,8 @@ def daily_sales_data(context: dg.AssetExecutionContext) -> None:


@dg.asset(
partitions_def=daily_partitions,
deps=[daily_sales_data],
partitions_def=daily_partitions, # Use the daily partitioning scheme
deps=[daily_sales_data], # Define dependency on `daily_sales_data` asset
)
def daily_sales_summary(context):
partition_date_str = context.partition_key
Expand Down

0 comments on commit 53ae8c5

Please sign in to comment.