@graph_multi_asset - help please #24341
-
Apologies up front if I am misinterpretting the docs but I haven't been able to get my use case working or find any other references to resolve my issue. What I am trying to do. Any help would be appreciated. I am sure I am close but probably doing something small wrong. DAGSTER_VERSION = 1.8.4 Below is my code @graph_multi_asset(
group_name="CORE",
outs={
"trades": AssetOut(),
"dividends": AssetOut(),
"trade_actions": AssetOut(),
"transactions": AssetOut(),
"adjustments": AssetOut(),
},
)
def summary_report():
report: MaterialExcel = get_summary_report_from_email(gen_report())
return {
"trades": report.workbook_sheets["Trades"],
"dividends": report.workbook_sheets["Dividends"],
"trade_actions": report.workbook_sheets["Corporate Actions"],
"transactions": report.workbook_sheets["Deposits & Withdrawals"],
"adjustments": report.workbook_sheets["Other & Adjustments"],
}
@op
def gen_report(selenium_grid: SeleniumGrid, auth: AuthCreds) -> None:
pass
@op(ins={"waits_for": In(Nothing)})
def get_summary_report_from_email(context: OpExecutionContext) -> MaterialExcel:
report_file = "20240318-summary-report.xlsx"
report = MaterialExcel(file_name=report_file, workbook_sheets=pd.read_excel(f"data/raw/{report_file}", sheet_name=None))
# fmt: off
context.add_output_metadata({
"file_name": report.file_name,
"sheet_names": list(report.workbook_sheets.keys()),
"sheet_count": len(report.workbook_sheets.keys()),
})
# fmt: on
return report
class MaterialExcel(BaseModel):
file_name: str
workbook_sheets: dict The following code errors out when you start the dagster dev server with the following error. dagster._core.errors.DagsterUserCodeLoadError: Error occurred during the loading of Dagster definitions in
executable_path=/Users/<redacted>/Code/.venv/bin/python, module_name=data_pipelines.run, working_directory=/Users/<redacted>/Code/data-pipelines
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_grpc/server.py", line 419, in __init__
self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
^^^^^^^^^^^^^^^^^^^
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_grpc/server.py", line 242, in __init__
with user_code_error_boundary(
File "/Users/<redacted>/.pyenv/versions/3.12.5/lib/python3.12/contextlib.py", line 158, in __exit__
self.gen.throw(value)
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
raise new_error from e
The above exception was caused by the following exception:
AttributeError: 'InvokedNodeOutputHandle' object has no attribute 'workbook_sheets'
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
yield
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_grpc/server.py", line 253, in __init__
loadable_targets = get_loadable_targets(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_grpc/utils.py", line 50, in get_loadable_targets
else loadable_targets_from_python_module(module_name, working_directory)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_core/workspace/autodiscovery.py", line 31, in loadable_targets_from_python_module
module = load_python_module(
^^^^^^^^^^^^^^^^^^^
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_core/code_pointer.py", line 134, in load_python_module
return importlib.import_module(module_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/<redacted>/.pyenv/versions/3.12.5/lib/python3.12/importlib/__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/Users/<redacted>/Code/data_pipelines/run.py", line 3, in <module>
from .assets import core
File "/Users/<redacted>/Code/data_pipelines/assets/core/__init__.py", line 39, in <module>
@graph_multi_asset(
^^^^^^^^^^^^^^^^^^
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_core/definitions/decorators/asset_decorator.py", line 967, in inner
op_graph = graph(
^^^^^^
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_core/definitions/decorators/graph_decorator.py", line 79, in __call__
) = do_composition(
^^^^^^^^^^^^^^^
File "/Users/<redacted>/Code/.venv/lib/python3.12/site-packages/dagster/_core/definitions/composition.py", line 949, in do_composition
output = fn(**kwargs)
^^^^^^^^^^^^
File "/Users/<redacted>/Code/data_pipelines/assets/core/__init__.py", line 53, in summary_report_v2
"trades": report.workbook_sheets["Trades"],
^^^^^^^^^^^^^^^^^^^^^^ |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Hey @darktempla - thanks for your patience on this one. The issue here is that Dagster doesn't allow runtime code execution in composition functions like I'd recommend modifying |
Beta Was this translation helpful? Give feedback.
Hey @darktempla - thanks for your patience on this one. The issue here is that Dagster doesn't allow runtime code execution in composition functions like
@graph_multi_asset
. Here's a deeper explanation of why: #12554.I'd recommend modifying
get_summary_report_from_email
to have separateOut
s for trades, dividends, etc. Or creating a downsteam op to do that unpacking.