-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DOC-376 partitioning assets 1/ Define partitioned assets (#24124)
## Summary & Motivation ticket: https://linear.app/dagster-labs/issue/DOC-376/write-partitioning-assets outline discussed here: https://dagsterlabs.slack.com/archives/C07HNLRHFTR/p1724708451193039 this pr produces the first section "Define partitioned assets" with fully self-contained examples - request for feedback: are the examples too long? i often find myself wondering about “how to kick off things from a sensor or schedule,” so i included the sensor and schedule code in the examples as well. more diffs coming in the stack for "Define dependencies between partitioned assets" and dbt related. ## How I Tested These Changes https://dagster-docs-beta-aoe0uid9t-elementl.vercel.app/guides/data-modeling/partitioning ## Changelog [New | Bug | Docs] `NOCHANGELOG`
- Loading branch information
Showing
5 changed files
with
428 additions
and
2 deletions.
There are no files selected for viewing
117 changes: 115 additions & 2 deletions
117
docs/docs-beta/docs/guides/data-modeling/partitioning.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,119 @@ | ||
--- | ||
title: How to Partition Your Data | ||
title: Partitioning assets | ||
description: Learn how to partition your data in Dagster. | ||
sidebar_label: Partitioning pipelines | ||
sidebar_label: Partitioning assets | ||
sidebar_position: 30 | ||
--- | ||
|
||
In Dagster, partitioning is a powerful technique for managing large datasets, improving pipeline performance, and enabling incremental processing. This guide will help you understand how to implement data partitioning in your Dagster projects. | ||
|
||
## What you'll learn | ||
|
||
- How to define partitions for Dagster assets and jobs | ||
- How to establish dependencies between partitioned assets | ||
- How to leverage Dagster partitions with external systems like dbt | ||
|
||
--- | ||
|
||
<details> | ||
<summary>Prerequisites</summary> | ||
|
||
To follow the steps in this guide, you'll need: | ||
|
||
- A basic understanding of Dagster and assets. See the [Quick Start](/tutorial/quick-start) tutorial for an overview. | ||
|
||
</details> | ||
|
||
## Define partitioned assets | ||
|
||
There are several ways to partition your data in Dagster: | ||
|
||
- [Time-based partitioning](#define-time-partitioned-assets), for processing data in specific time intervals | ||
- [Static partitioning](#define-partitions-with-predefined-categories), for dividing data based on predefined categories | ||
- [Two-dimensional partitioning](#define-two-dimensional-partitions), for partitioning data along two different axes simultaneously | ||
- [Dynamic partitioning](#define-partitions-with-dynamic-categories), for creating partitions based on runtime information | ||
|
||
### Define time-partitioned assets | ||
|
||
A common use case for partitioning is to process data that can be divided into time intervals, such as daily logs or monthly reports. Here's how to implement time-based partitioning in Dagster: | ||
|
||
<CodeExample filePath="guides/data-modeling/partitioning/time_based_partitioning.py" language="python" title="Time-based partitioning" /> | ||
|
||
In this example: | ||
|
||
- We defined `daily_partitions` using `DailyPartitionsDefinition` with a start date of "2024-01-01". This will create a range of partitions from "2024-01-01" to the day before the current time. | ||
- The `daily_sales_data` asset is defined with this partitioning scheme. | ||
- The `daily_sales_summary` asset depends on `daily_sales_data` and also uses the same partitioning scheme. | ||
- The schedule `daily_sales_schedule` runs the job daily at 1:00 AM UTC and processes the previous day's data. | ||
|
||
### Define partitions with predefined categories | ||
|
||
Sometimes you have a set of predefined categories for your data. For instance, you might want to process data separately for different regions. | ||
|
||
<CodeExample filePath="guides/data-modeling/partitioning/static_partitioning.py" language="python" title="Static partitioning" /> | ||
|
||
In this example: | ||
|
||
- We defined `region_partitions` using `StaticPartitionsDefinition` with a list of regions. | ||
- The `regional_sales_data` and `daily_sales_summary` are defined with the same partitioning scheme. | ||
|
||
TODO: Link to Backfill page to explain how to backfill reginonal sales data | ||
|
||
### Define two-dimensional partitions | ||
|
||
Two-dimensional partitioning allows you to partition your data along two different axes simultaneously. This is useful when you need to process data that can be categorized in multiple ways. Here's an example of how to implement two-dimensional partitioning in Dagster: | ||
|
||
<CodeExample filePath="guides/data-modeling/partitioning/two_dimensional_partitioning.py" language="python" title="Two-dimensional partitioning" /> | ||
|
||
In this example: | ||
|
||
- We defined `two_dimensional_partitions` using `MultiPartitionsDefinition` with two dimensions: `date` and `region`. | ||
- The partition key would be: `2024-08-01|us`. | ||
- The `daily_regional_sales_data` and `daily_regional_sales_summary` assets are defined with the same two-dimensional partitioning scheme. | ||
- The `daily_regional_sales_schedule` runs daily at 1:00 AM, processing the previous day's data for all regions. It uses `MultiPartitionKey` to specify partition keys for both date and region dimensions, resulting in 3 runs per day (one for each region). | ||
|
||
### Define partitions with dynamic categories | ||
|
||
Sometimes you don't know the partitions in advance. For example, you might want to process regions that are added in your system. In such cases, you can use dynamic partitioning to create partitions based on runtime information. | ||
|
||
<CodeExample filePath="guides/data-modeling/partitioning/dynamic_partitioning.py" language="python" title="Dynamic partitioning" /> | ||
|
||
In this example: | ||
|
||
- We defined `region_partitions` using `DynamicPartitionsDefinition` without knowing the values in advance. | ||
- The `all_regions_sensor` is a sensor that will dynamically add all regions to the partition set. Once it kicks off runs, it will dynamically kick off runs for all regions (in this example, 6 times; one for each region). | ||
|
||
## Define dependencies between partitioned assets | ||
|
||
Now that you've seen how to model partitioned assets in different ways, this section shows how to define dependencies between various partitioned assets, and between partitioned assets and un-partitioned assets. | ||
|
||
### Dependencies between time-based partitions | ||
|
||
Partitioned assets in Dagster can have dependencies on other partitioned assets. This allows you to create complex data pipelines where the output of one partitioned asset feeds into another. Here's how it works: | ||
|
||
- A downstream asset can depend on one or more partitions of an upstream asset | ||
- The partitioning schemes don't need to be identical, but they should be compatible | ||
|
||
TODO | ||
|
||
### Dependencies between time-based and static partitions | ||
|
||
Combining time-based and static partitions allows you to analyze data across both temporal and categorical dimensions. This is particularly useful for scenarios like regional time series analysis. | ||
|
||
TODO | ||
|
||
### Dependencies between time-based and dynamic partitions | ||
|
||
TODO | ||
|
||
### Dependencies between time-based partitions and un-partitioned assets | ||
|
||
TODO | ||
|
||
## Integrating Dagster Partitions with External Systems: Incremental Models and dbt | ||
|
||
TODO | ||
|
||
## Next steps | ||
|
||
- Go deeper into [Understanding Partitioning](#) |
74 changes: 74 additions & 0 deletions
74
...eta_snippets/docs_beta_snippets/guides/data-modeling/partitioning/dynamic_partitioning.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import os | ||
|
||
import pandas as pd | ||
|
||
import dagster as dg | ||
|
||
# Create the PartitionDefinition | ||
region_partitions = dg.DynamicPartitionsDefinition(name="regions") | ||
|
||
|
||
# Define the partitioned asset | ||
@dg.asset(partitions_def=region_partitions) | ||
def regional_sales_data(context: dg.AssetExecutionContext) -> None: | ||
region = context.partition_key | ||
|
||
# Simulate fetching daily sales data | ||
df = pd.DataFrame( | ||
{ | ||
"region": [region] * 10, | ||
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000], | ||
} | ||
) | ||
|
||
os.makedirs("regional_sales", exist_ok=True) | ||
filename = f"regional_sales/sales_{region}.csv" | ||
df.to_csv(filename, index=False) | ||
|
||
context.log.info(f"Regional sales data written to {filename}") | ||
|
||
|
||
@dg.asset( | ||
partitions_def=region_partitions, | ||
deps=[regional_sales_data], | ||
) | ||
def daily_sales_summary(context): | ||
region = context.partition_key | ||
# Read the CSV file for the given partition date | ||
filename = f"regional_sales/sales_{region}.csv" | ||
df = pd.read_csv(filename) | ||
|
||
# Summarize daily sales | ||
summary = { | ||
"region": region, | ||
"total_sales": df["sales"].sum(), | ||
} | ||
|
||
context.log.info(f"Regional sales summary for {region}: {summary}") | ||
|
||
|
||
# Create a partitioned asset job | ||
regional_sales_job = dg.define_asset_job( | ||
name="regional_sales_job", | ||
selection=[regional_sales_data, daily_sales_summary], | ||
partitions_def=region_partitions, | ||
) | ||
|
||
|
||
@dg.sensor(job=regional_sales_job) | ||
def all_regions_sensor(context: dg.SensorEvaluationContext): | ||
# Simulate fetching all regions from an external system | ||
all_regions = ["us", "eu", "jp", "ca", "uk", "au"] | ||
|
||
return dg.SensorResult( | ||
run_requests=[dg.RunRequest(partition_key=region) for region in all_regions], | ||
dynamic_partitions_requests=[region_partitions.build_add_request(all_regions)], | ||
) | ||
|
||
|
||
# Define the Definitions object | ||
defs = dg.Definitions( | ||
assets=[regional_sales_data, daily_sales_summary], | ||
jobs=[regional_sales_job], | ||
sensors=[all_regions_sensor], | ||
) |
62 changes: 62 additions & 0 deletions
62
...beta_snippets/docs_beta_snippets/guides/data-modeling/partitioning/static_partitioning.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import os | ||
|
||
import pandas as pd | ||
|
||
import dagster as dg | ||
|
||
# Create the PartitionDefinition | ||
region_partitions = dg.StaticPartitionsDefinition(["us", "eu", "jp"]) | ||
|
||
|
||
# Define the partitioned asset | ||
@dg.asset(partitions_def=region_partitions) | ||
def regional_sales_data(context: dg.AssetExecutionContext) -> None: | ||
region = context.partition_key | ||
|
||
# Simulate fetching daily sales data | ||
df = pd.DataFrame( | ||
{ | ||
"region": [region] * 10, | ||
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000], | ||
} | ||
) | ||
|
||
os.makedirs("regional_sales", exist_ok=True) | ||
filename = f"regional_sales/sales_{region}.csv" | ||
df.to_csv(filename, index=False) | ||
|
||
context.log.info(f"Regional sales data written to {filename}") | ||
|
||
|
||
@dg.asset( | ||
partitions_def=region_partitions, | ||
deps=[regional_sales_data], | ||
) | ||
def daily_sales_summary(context): | ||
region = context.partition_key | ||
# Read the CSV file for the given partition date | ||
filename = f"regional_sales/sales_{region}.csv" | ||
df = pd.read_csv(filename) | ||
|
||
# Summarize daily sales | ||
summary = { | ||
"region": region, | ||
"total_sales": df["sales"].sum(), | ||
} | ||
|
||
context.log.info(f"Regional sales summary for {region}: {summary}") | ||
|
||
|
||
# Create a partitioned asset job | ||
regional_sales_job = dg.define_asset_job( | ||
name="regional_sales_job", | ||
selection=[regional_sales_data, daily_sales_summary], | ||
partitions_def=region_partitions, | ||
) | ||
|
||
|
||
# Define the Definitions object | ||
defs = dg.Definitions( | ||
assets=[regional_sales_data, daily_sales_summary], | ||
jobs=[regional_sales_job], | ||
) |
79 changes: 79 additions & 0 deletions
79
..._snippets/docs_beta_snippets/guides/data-modeling/partitioning/time_based_partitioning.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import datetime | ||
import os | ||
|
||
import pandas as pd | ||
|
||
import dagster as dg | ||
|
||
# Create the PartitionDefinition | ||
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01") | ||
|
||
|
||
# Define the partitioned asset | ||
@dg.asset(partitions_def=daily_partitions) | ||
def daily_sales_data(context: dg.AssetExecutionContext) -> None: | ||
date = context.partition_key | ||
# Simulate fetching daily sales data | ||
df = pd.DataFrame( | ||
{ | ||
"date": [date] * 10, | ||
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000], | ||
} | ||
) | ||
|
||
os.makedirs("daily_sales", exist_ok=True) | ||
filename = f"daily_sales/sales_{date}.csv" | ||
df.to_csv(filename, index=False) | ||
|
||
context.log.info(f"Daily sales data written to {filename}") | ||
|
||
|
||
@dg.asset( | ||
partitions_def=daily_partitions, | ||
deps=[daily_sales_data], | ||
) | ||
def daily_sales_summary(context): | ||
partition_date_str = context.partition_key | ||
# Read the CSV file for the given partition date | ||
filename = f"daily_sales/sales_{partition_date_str}.csv" | ||
df = pd.read_csv(filename) | ||
|
||
# Summarize daily sales | ||
summary = { | ||
"date": partition_date_str, | ||
"total_sales": df["sales"].sum(), | ||
} | ||
|
||
context.log.info(f"Daily sales summary for {partition_date_str}: {summary}") | ||
|
||
|
||
# Create a partitioned asset job | ||
daily_sales_job = dg.define_asset_job( | ||
name="daily_sales_job", | ||
selection=[daily_sales_data, daily_sales_summary], | ||
partitions_def=daily_partitions, | ||
) | ||
|
||
|
||
# Create a schedule to run the job daily | ||
@dg.schedule( | ||
job=daily_sales_job, | ||
cron_schedule="0 1 * * *", # Run at 1:00 AM every day | ||
) | ||
def daily_sales_schedule(context): | ||
"""Process previous day's sales data.""" | ||
# Calculate the previous day's date | ||
previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1) | ||
date = previous_day.strftime("%Y-%m-%d") | ||
return dg.RunRequest( | ||
run_key=date, | ||
partition_key=date, | ||
) | ||
|
||
|
||
# Define the Definitions object | ||
defs = dg.Definitions( | ||
assets=[daily_sales_data, daily_sales_summary], | ||
jobs=[daily_sales_job], | ||
schedules=[daily_sales_schedule], | ||
) |
Oops, something went wrong.
2e6b20a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploy preview for dagster-docs-beta ready!
✅ Preview
https://dagster-docs-beta-e9l1oecao-elementl.vercel.app
https://dagster-docs-beta.dagster-docs.io
Built with commit 2e6b20a.
This pull request is being automatically deployed with vercel-action