Skip to content

Commit

Permalink
Add scheduling_policy via Metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 12, 2024
1 parent d4c988c commit 5413c48
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 0 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING, Optional

from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey

if TYPE_CHECKING:
from dagster._core.definitions.assets import AssetsDefinition


class SchedulingPolicy:
METADATA_KEY = "dagster/scheduling_policy"
...

@staticmethod
def of_assets_def(
assets_def: "AssetsDefinition", asset_key: Optional[CoercibleToAssetKey] = None
) -> Optional["SchedulingPolicy"]:
asset_key = AssetKey.from_coercible(asset_key) if asset_key else assets_def.key
value = assets_def.metadata_by_key[asset_key].get(SchedulingPolicy.METADATA_KEY)
return value if isinstance(value, SchedulingPolicy) else None
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Tuple

from dagster import asset, multi_asset
from dagster._core.declarative_scheduling.policy import SchedulingPolicy
from dagster._core.definitions.asset_spec import AssetSpec


def test_basic_scheduling_policy_inclusion() -> None:
scheduling_policy = SchedulingPolicy()

@asset(metadata={SchedulingPolicy.METADATA_KEY: scheduling_policy})
def an_asset() -> None: ...

assert SchedulingPolicy.of_assets_def(an_asset, an_asset.key) is scheduling_policy
assert SchedulingPolicy.of_assets_def(an_asset) is scheduling_policy

scheduling_policy_one = SchedulingPolicy()
scheduling_policy_two = SchedulingPolicy()

specs = [
# https://linear.app/dagster-labs/issue/FOU-99/more-principled-approach-to-unserializable-metadata for context
# on the type ignores
AssetSpec("asset_one", metadata={SchedulingPolicy.METADATA_KEY: scheduling_policy_one}), # type: ignore
AssetSpec("asset_two", metadata={SchedulingPolicy.METADATA_KEY: scheduling_policy_two}), # type: ignore
]

@multi_asset(specs=specs)
def a_multi_asset() -> Tuple[None, None]: ...

assert SchedulingPolicy.of_assets_def(a_multi_asset, "asset_one") is scheduling_policy_one
assert SchedulingPolicy.of_assets_def(a_multi_asset, "asset_two") is scheduling_policy_two

0 comments on commit 5413c48

Please sign in to comment.