How to implement multiple production pathways for a single asset #23705
-
I am trying to set up a fairly simple use-case where a single asset can be produced via multiple pathways. I've been scouring the documentation but I can't for the life of me figure how to do this. Forgive the somewhat contrived example, but let's say I have two assets I can imagine you'd do something like this: from dagster import asset, AssetIn
@asset
def pine_tree():
# Logic to produce PineTree
return "PineTree"
@asset
def pine_table():
# Logic to produce PineTable
return "PineTable"
@asset(ins={"pine_tree": AssetIn(), "pine_table": AssetIn()})
def pine_planks(context, pine_tree=None, pine_table=None):
if pine_tree:
# Logic to produce PinePlanks from PineTree
return f"PinePlanks from {pine_tree}"
elif pine_table:
# Logic to produce PinePlanks from PineTable
return f"PinePlanks from {pine_table}"
else:
raise ValueError("Either pine_tree or pine_table must be provided") However, this wouldn't quite work, since if both a |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Both upstream assets would have to materialize successfully for this structure to work. You can't materialize one upstream asset and not the other if they are both inputs or dependencies for a downstream asset. Depending on your use case, you could consider other approaches such as using graph-backed assets or using multi-asset with a resource as in this example: from dagster import resource, multi_asset, AssetOut, Output, Definitions, OpExecutionContext
@resource
def my_resource():
def my_method():
return "shared_value_from_my_resource"
return my_method
@multi_asset(
outs={
"asset_1": AssetOut(is_required=False),
"asset_2": AssetOut(is_required=False)
},
required_resource_keys={"my_resource"},
can_subset=True
)
def my_assets_sharing_a_resource(context: OpExecutionContext):
# all assets that will be output from this single function can share the value from the same resource
shared_value = context.resources.my_resource()
if "asset_1" in context.selected_output_names:
# logic for defining the output of the asset named asset_1
combined_output_1 = "asset_1 result: " + shared_value
context.log.info(combined_output_1)
yield Output(combined_output_1, output_name="asset_1")
if "asset_2" in context.selected_output_names:
# logic for defining the output of the asset named asset_2
combined_output_2 = "asset_2 result: " + shared_value
context.log.info(combined_output_2)
yield Output(combined_output_2, output_name="asset_2")
defs = Definitions(
assets=[my_assets_sharing_a_resource],
resources={"my_resource": my_resource},
)
|
Beta Was this translation helpful? Give feedback.
Both upstream assets would have to materialize successfully for this structure to work. You can't materialize one upstream asset and not the other if they are both inputs or dependencies for a downstream asset.
Depending on your use case, you could consider other approaches such as using graph-backed assets or using multi-asset with a resource as in this example: