How to use run_requests
and dynamic_partitions_requests
with SensorResult
#23248
-
@jamiedemaria, I can't undesrstand how to construct run_requests and dynamic_partitions_requests for the SensorResult to return it from sensor? Originally posted by @andkuzmich in #14334 (reply in thread) |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
here's a code snippet dynamic_colors = DynamicPartitionsDefinition(name="dynamic_colors")
@asset(
partitions_def=dynamic_colors,
)
def colors_asset(context: AssetExecutionContext):
context.log.info(context.partition_key)
@sensor(asset_selection=[colors_asset])
def run_colors():
colors = ["purple", "orange"]
run_requests = []
for color in colors:
run_requests.append(RunRequest(run_key=color, partition_key=color))
return SensorResult(
run_requests=run_requests,
dynamic_partitions_requests=[AddDynamicPartitionsRequest(partitions_def_name="dynamic_colors", partition_keys=colors)]
)
defs = Definitions(assets=[colors_asset], sensors=[run_colors]) the sensors concept page has some more information about run requests. You can learn more about dynamic partitions and |
Beta Was this translation helpful? Give feedback.
-
@jamiedemaria, thank you for your reply!
And my question was: what is the correct way to say Dagster that it should run a partition with the defined day and list of 'colours' in this case? The code snippet that you provided is about DynamicPartitions usage. My problem is related to IoT where I have daily partitioned data but for different sensors. I have one asset, that collects sensors that was available for that day, than I have the sensor that monitors the asset materialization and it recieves the collected sensor names and then I hope to trigger the another asset materialization per time window per sensor name. I'm new with Dagster and I totally stuck with this. |
Beta Was this translation helpful? Give feedback.
-
ah, apologies for the confusion about that. If you want to make a dynamic_colors = DynamicPartitionsDefinition(name="dynamic_colors")
partitions_def = MultiPartitionsDefinition(
{
"time": DailyPartitionsDefinition(start_date="2022-01-01"),
"color": dynamic_colors,
}
)
@asset(
partitions_def=partitions_def,
)
def colors_asset(context: AssetExecutionContext):
context.log.info(context.partition_key)
@sensor(asset_selection=[colors_asset])
def run_colors():
colors = ["purple", "orange"]
run_requests = []
for color in colors:
run_requests.append(RunRequest(run_key=color, partition_key=MultiPartitionKey({"time": "2022-01-05", "color": color})))
return SensorResult(
run_requests=run_requests,
dynamic_partitions_requests=[AddDynamicPartitionsRequest(partitions_def_name="dynamic_colors", partition_keys=colors)]
)
defs = Definitions(assets=[colors_asset], sensors=[run_colors]) |
Beta Was this translation helpful? Give feedback.
here's a code snippet