Replies: 1 comment
-
Update: I actually found a work around by wrapping all my ops in a non-op function, then mapping the non-op to the dynamic output. See below an example: @op
def return_some_list(context):
return [1, 2, 3, 4, 5]
@op(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
context.log.info(str(nums))
for num in nums:
yield DynamicOutput(num, mapping_key=f'subtask_{num}')
def process(num):
num1 = sub_5(num)
mkdir(num1)
@op
def sub_5(context, some_num: int):
time.sleep(5)
# context.log.info(str(some_num))
some_num -= 5
# mkdir(some_num)
return some_num
@op
def mkdir(context, num: int):
os.mkdir(str(num))
context.log.info(str(num))
@job
def some_pipeline():
output_list = return_some_list()
generate_subtasks(output_list).map(process) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi there,
I am familiarizing myself with the DynamicOutput option. I understand that you can map an op to a list items and preform these ops in parallel. However, for the project I am working on, I would like to run essentially a separate instance of multiple ops on a series of items. Is there a way to do this yet in Dagster?
Beta Was this translation helpful? Give feedback.
All reactions