Replies: 1 comment 2 replies
-
Hi Alec, currently there's not a way to natively One potential way around this is to write a custom op which guards the downstream compute until all of the dynamic fan-out steps have either succeeded or failed. The downside is you cannot directly access the outputs of the dynamic stage. You could do something along the lines of the following: from dagster import Definitions, op, DynamicOut, DynamicOutput, In, job, Nothing
from typing import List
from dagster._core.execution.context.compute import OpExecutionContext
import time
from dagster._core.execution.stats import StepEventStatus
MONITOR_TIME = 10
@op(
out=DynamicOut(int),
)
def my_dynamic_out_op():
for i in range(10):
...
yield DynamicOutput(value=...)
@op(
ins={"my_input": In(str)},
)
def my_fan_out_operation(my_input: str):
...
@op(ins={"my_inputs": In(List[str])})
def wait_for_completion(context: OpExecutionContext, my_inputs):
stats = list(context.instance.get_run_step_stats(context.run_id))
my_fan_out_operation_stats = [
stat for stat in stats if stat.step_key.startswith(my_fan_out_operation.name)
]
while any(
[
stat.status == StepEventStatus.IN_PROGRESS
for stat in my_fan_out_operation_stats
]
) or len(my_fan_out_operation_stats) != len(my_inputs):
time.sleep(MONITOR_TIME)
stats = list(context.instance.get_run_step_stats(context.run_id))
my_fan_out_operation_stats = [
stat
for stat in stats
if stat.step_key.startswith(my_fan_out_operation.name)
]
@op(ins={"wait": In(Nothing)})
def after_complete():
...
@job
def my_job():
res = my_dynamic_out_op()
res.map(my_fan_out_operation)
after_complete(wait_for_completion(res.collect())) |
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I am using a job with
DynamicOut
and a single.collect()
stage to handle the results. In my use case, it is critical that the downstream ops still run even if some of the ops in the collected outs have failed. In particular, if an infrastructure problem causes a container to get evicted and there are no more retries allowed, I cannot handle that with atry, except
inside of the op and the runner will mark it as a failure and refuse to run downstream tasks.Beta Was this translation helpful? Give feedback.
All reactions