Skip to content

Commit

Permalink
Add the ability to define upstream deps to created Dagster asset
Browse files Browse the repository at this point in the history
  • Loading branch information
guusfrenken-wk committed Sep 19, 2024
1 parent 5731013 commit beea9d4
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Generator, List
from typing import Generator, Iterable, List, Sequence
from elx import Runner
from dagster import (
AssetsDefinition,
Expand All @@ -7,14 +7,20 @@
Output,
multi_asset,
AssetOut,
SourceAsset,
AssetKey,
AssetDep,
get_dagster_logger,
)
from elx.extensions.dagster.utils import dagster_safe_name, generate_description

logger = get_dagster_logger()


def load_assets(runner: Runner) -> List[AssetsDefinition]:
def load_assets(
runner: Runner,
deps: Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None = None,
) -> List[AssetsDefinition]:
"""
Load the assets for a runner, each asset represents one tap target combination.
Expand Down Expand Up @@ -67,6 +73,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
return [
multi_asset(
name=f"run_{dagster_safe_name(runner.tap.executable)}_{dagster_safe_name(runner.target.executable)}",
deps=deps,
outs={
dagster_safe_name(stream.name): AssetOut(
is_required=False,
Expand Down

0 comments on commit beea9d4

Please sign in to comment.