From beea9d4a12f5a23e30c36481d40eac16186c204d Mon Sep 17 00:00:00 2001 From: Guus Frenken Date: Thu, 19 Sep 2024 15:00:10 +0200 Subject: [PATCH] Add the ability to define upstream deps to created Dagster asset --- elx/extensions/dagster/assets.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index b385004..92956f3 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -1,4 +1,4 @@ -from typing import Generator, List +from typing import Generator, Iterable, List, Sequence from elx import Runner from dagster import ( AssetsDefinition, @@ -7,6 +7,9 @@ Output, multi_asset, AssetOut, + SourceAsset, + AssetKey, + AssetDep, get_dagster_logger, ) from elx.extensions.dagster.utils import dagster_safe_name, generate_description @@ -14,7 +17,10 @@ logger = get_dagster_logger() -def load_assets(runner: Runner) -> List[AssetsDefinition]: +def load_assets( + runner: Runner, + deps: Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None = None, +) -> List[AssetsDefinition]: """ Load the assets for a runner, each asset represents one tap target combination. @@ -67,6 +73,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: return [ multi_asset( name=f"run_{dagster_safe_name(runner.tap.executable)}_{dagster_safe_name(runner.target.executable)}", + deps=deps, outs={ dagster_safe_name(stream.name): AssetOut( is_required=False,