How wait for queued materializations of partitioned upstream asset? #20943
-
I have a partitioned asset (dynamic partitions, but that's probably not important here) with ~100 partitions and a non-partitioned downstream asset with When a sensor triggers runs for some or all of the partitions of the upstream asset each run will in turn trigger a materialization of the downstream asset (and it's children) even if a long list of runs are still queued for the upstream, resulting in lots of expensive (and mostly unnecessary) runs. I would expect Dagster to detect that materializations are still in progress and wait until the last queued run finishes and only then materialize the downstream asset. Is there a way to make Dagster behave like that? Some combination of partition mappings and automaterialize policies maybe? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
The behavior you're experiencing with the To control when downstream assets are materialized, you might consider using a custom sensor instead of relying solely on the eager auto-materialize policy. A custom sensor can provide more granular control over when downstream materializations are triggered. For example, you could write a sensor that checks whether all relevant upstream partitions have been materialized before triggering the downstream asset. The documentation provides an example of a custom sensor that monitors upstream assets and triggers materializations of downstream assets based on certain conditions. Here's a simplified version of such a sensor: from dagster import sensor, RunRequest, AssetKey
@sensor(job=downstream_job)
def custom_downstream_sensor(context):
upstream_asset_key = AssetKey("upstream_asset")
downstream_asset_key = AssetKey("downstream_asset")
# Check if all partitions of the upstream asset have been materialized
if context.instance.get_materialization_count_by_partition(upstream_asset_key):
# If all partitions are materialized, trigger downstream asset
yield RunRequest(run_key="downstream_run", run_config={}) This is just a conceptual example, and you would need to adapt it to your specific use case, including the logic to determine if all relevant upstream partitions have been materialized. Additionally, there are discussions in the Dagster community about similar use cases, where users want to prevent auto-materialization until all in-progress parent materializations are complete. While there isn't a built-in feature to handle this scenario directly, the community often suggests custom sensors or other workarounds as a solution. |
Beta Was this translation helpful? Give feedback.
-
This has been answered in this discussion: #19214 (comment) |
Beta Was this translation helpful? Give feedback.
This has been answered in this discussion: #19214 (comment)