Sensor to trigger job over partition subset update #24351
-
I am trying to write a sensor that will trigger the updates of a downstream assets partitions off an upstreams asset partition updates. The issue I am running into is that I can only get it to trigger off a single partition update and not all the partitions that are updated (7).
import dagster
daily_partition_def = dagster.DailyPartitionsDefinition(start_date="2024-08-01")
@dagster.asset(partitions_def=daily_partition_def)
def raw_order_data():
pass
@dagster.asset(partitions_def=daily_partition_def)
def top_products_file():
pass
raw_order_data_job = dagster.define_asset_job(
name="raw_order_data",
selection="raw_order_data",
partitions_def=daily_partition_def,
)
top_products_job = dagster.define_asset_job(
name="top_products",
selection="top_products_file",
partitions_def=daily_partition_def,
)
_RECENT_ORDER_DATA_WINDOW = 7
@dagster.schedule(job=raw_order_data_job, cron_schedule="*/30 * * * *")
def trailing_seven_days_order_data_schedule(context: dagster.ScheduleEvaluationContext):
partition_keys = daily_partition_def.get_partition_keys(
dynamic_partitions_store=context.instance
)
partition_keys = sorted(partition_keys)[::-1][:_RECENT_ORDER_DATA_WINDOW]
runs = [
dagster.RunRequest(
run_key=f"{context.scheduled_execution_time}_{pt}", partition_key=pt
)
for pt in partition_keys
]
return runs
# TODO: this appears to only do 1 partition
@dagster.asset_sensor(
asset_key=dagster.AssetKey("raw_order_data"), job=top_products_job
)
def top_products_sensor(
context: dagster.SensorEvaluationContext, asset_event: dagster.EventLogEntry
):
dag_event = asset_event.get_dagster_event()
if dag_event.is_step_materialization:
yield dagster.RunRequest(
run_key=context.cursor,
partition_key=dag_event.partition,
) |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Hi Adam. By default, If you wanted to make a sensor retrieve all new event log entries you could define a Or, you could just use declarative automation to handle this for you. |
Beta Was this translation helpful? Give feedback.
Hi Adam. By default,
@asset_sensor
will just fetch the latest new event log entry and pass that as a param to the decorated function, which is why you're only receiving one partition.If you wanted to make a sensor retrieve all new event log entries you could define a
@multi_asset
sensor instead, something like this example.Or, you could just use declarative automation to handle this for you.