Launching a single RunRequest for a set of partitions from a schedule or sensor #19457
Replies: 2 comments 3 replies
-
Maybe this #14622? |
Beta Was this translation helpful? Give feedback.
-
I have the same issue. In short: Now to the problem: The quite ugly workaround for me is to: @sensor(job=...)
def sensor_do_job(
context: SensorEvaluationContext,
):
partition_keys = get_partition_keys_from_some_resource()
if len(partition_keys) > 1:
context.log.info(f"Clearing {len(partition_keys)} partitions")
for partition in partition_keys:
context.instance.delete_dynamic_partition(
partitions_def.name, partition
)
context.log.info(f"Adding {len(partition_keys)} partitions")
context.instance.add_dynamic_partitions(
partitions_def.name,
partition_keys=partition_keys,
)
return RunRequest(
tags={
"dagster/asset_partition_range_start": partition_keys[0],
"dagster/asset_partition_range_end": partition_keys[-1],
}
)
elif len(partition_keys) == 1:
return RunRequest(partition_key=partition_keys[0]) By deleting and then reinserting, the partition_keys the are appended as a continuous range and can therefore be run using tags. However this solution comes with the drawback that if in the process of deleting and adding the partition_keys another application is using the same I really think that assets which fill multiple partitions with a single run are really missing an important feature here. In my specific usecase the initialization cost stems from a machine learning model that is loaded from cluster storage into memory for inference. One possibility would be to serve this model to an endpoint in our Kubernetes cluster and therefore eleminate the cost in setting up the resource. However this would come with some additional structural overhead and would eleminate in parts the ease of use of dagster. Would be very interested in how others solved the problem of initialization cost or potential other workarounds of the mentioned problem. Thanks! |
Beta Was this translation helpful? Give feedback.
-
Is there a way to launch a single RunRequest for a set of partitions from a schedule or sensor, just like you can do a single run backfill on the UI?
I have a partitioned asset with dynamic partitions (7-800 partitions) and I want to periodically refresh the table for some or all the available partitions (essentially to have my own replica of a dataset provided via an API). The asset itself can work with
partition_keys
(can handleBackfillPolicy.single_run()
), as refreshing a single partition only takes a relatively lightweight API call, and I have a neat resource wrapping the API, so it’s easy to do all partitions within a single run (maybe even async).But I can’t seem to find a way to launch just one run for multiple partitions from a schedule. All the examples I saw iterate over the partition keys and create a
RunRequest
for each, but that won’t work for me here, because it creates too much overhead with hundreds of runs that in turn trigger hundreds of unnecessary auto-materializations of downstream assets and checks, with a flurry of Slack notifications, etc., while the whole business could be finished in a matter of seconds with multiple partitions on a single run.Beta Was this translation helpful? Give feedback.
All reactions