Skip to content

Commit

Permalink
feat(asset-checks): allow asset check subsetting (#16672)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Build on #16610 and
#16219 to enable asset check
subselection. This change was painful and a doozy.

To wrangle this complexity in the future we need to:
- Encompass a succinct and interpretable version of `XXXSelection` to
seamlessly enable the asset selection and asset check selection use case
- Standardize our usage of `None`, `[]`, `{}`, when indicating that a
selection is defaulting to select all the objects (assets and asset
checks).
- Rather than using `None`, an explicit sentinel value should be used
(e.g. `XXXSelection.ALL`, so that the value indicates its usage.
Likewise, `[]` and `{}` should be replaced with `XXXSelection.EMPTY`).

## How I Tested These Changes
- When materializing a subsetted asset, there are four cases:
  - Materialize without a selection
  - Materialize with a selection of assets and checks
  - Materialize with only a selection of assets
  - Materialize with only a selection of checks
- Remove any usages of `AssetSelection` that break the new subsetting
invariants (e.g. #16638)
- Existing pytest
- Stack on dbt implementation

---------

Co-authored-by: Johann Miller <[email protected]>
  • Loading branch information
rexledesma and johannkm authored Sep 27, 2023
1 parent 9618aba commit e69cd3f
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ def downstream_asset():


just_checks_job = define_asset_job(
name="just_checks_job", selection=AssetSelection.all_asset_checks()
name="just_checks_job",
selection=AssetSelection.checks_for_assets(checked_asset),
)


Expand Down
83 changes: 54 additions & 29 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,19 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}

assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = {
node_handle: assets_defs_by_key[asset_key]
for asset_key, node_handles in dep_node_handles_by_asset_key.items()
for node_handle in node_handles
# nodes for assets
**{
node_handle: assets_defs_by_key[asset_key]
for asset_key, node_handles in dep_node_handles_by_asset_key.items()
for node_handle in node_handles
},
# nodes for asset checks. Required for AssetsDefs that have selected checks
# but not assets
**{
node_handle: assets_def
for node_handle, assets_def in assets_defs_by_outer_node_handle.items()
if assets_def.check_keys
},
}

return AssetLayer(
Expand Down Expand Up @@ -806,33 +816,32 @@ def build_asset_selection_job(
# no selections, include everything
included_assets = list(assets)
excluded_assets = []
included_source_assets = []
included_checks = list(asset_checks)
included_source_assets = list(source_assets)
included_checks_defs = list(asset_checks)
else:
if asset_selection is not None:
(included_assets, excluded_assets) = _subset_assets_defs(assets, asset_selection)
included_source_assets = _subset_source_assets(source_assets, asset_selection)
else:
# if checks were specified, then exclude all assets
included_assets = []
excluded_assets = list(assets)
included_source_assets = []

if asset_check_selection is not None:
# NOTE: This filters to a checks def if any of the included specs are in the selection.
# This needs to change to fully subsetting checks in multi assets.
included_checks = [
# Filter to assets that match either selected assets or include a selected check.
# E.g. a multi asset can be included even if it's not in asset_selection, if it has a selected check
# defined with check_specs
(included_assets, excluded_assets) = _subset_assets_defs(
assets, asset_selection or set(), asset_check_selection
)
included_source_assets = _subset_source_assets(source_assets, asset_selection or set())

if asset_check_selection is None:
# If assets were selected and checks are None, then include all checks on the selected assets.
# Note: once we start explicitly passing in asset checks instead of None from the front end,
# we can remove this logic.
included_checks_defs = [
asset_check
for asset_check in asset_checks
if [spec for spec in asset_check.specs if spec.key in asset_check_selection]
if asset_check.asset_key in check.not_none(asset_selection)
]
else:
# If assets were selected and checks weren't, then include all checks on the selected assets.
# Note: a future diff needs to add support for selecting assets, and not their checks.
included_checks = [
# Otherwise, filter to explicitly selected checks defs
included_checks_defs = [
asset_check
for asset_check in asset_checks
if asset_check.asset_key in check.not_none(asset_selection)
if [spec for spec in asset_check.specs if spec.key in asset_check_selection]
]

if partitions_def:
Expand All @@ -844,11 +853,11 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

if len(included_assets) or len(included_checks) > 0:
if len(included_assets) or len(included_checks_defs) > 0:
asset_job = build_assets_job(
name=name,
assets=included_assets,
asset_checks=included_checks,
asset_checks=included_checks_defs,
config=config,
source_assets=[*source_assets, *excluded_assets],
resource_defs=resource_defs,
Expand Down Expand Up @@ -880,6 +889,7 @@ def build_asset_selection_job(
def _subset_assets_defs(
assets: Iterable["AssetsDefinition"],
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
) -> Tuple[Sequence["AssetsDefinition"], Sequence["AssetsDefinition"],]:
"""Given a list of asset key selection queries, generate a set of AssetsDefinition objects
representing the included/excluded definitions.
Expand All @@ -890,18 +900,33 @@ def _subset_assets_defs(
for asset in set(assets):
# intersection
selected_subset = selected_asset_keys & asset.keys

# if specific checks were selected, only include those
if selected_asset_check_keys is not None:
selected_check_subset = selected_asset_check_keys & asset.check_keys
# if no checks were selected, filter to checks that target selected assets
else:
selected_check_subset = {
handle for handle in asset.check_keys if handle.asset_key in selected_subset
}

# all assets in this def are selected
if selected_subset == asset.keys:
if selected_subset == asset.keys and selected_check_subset == asset.check_keys:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0:
elif len(selected_subset) == 0 and len(selected_check_subset) == 0:
excluded_assets.add(asset)
elif asset.can_subset:
# subset of the asset that we want
subset_asset = asset.subset_for(selected_asset_keys)
subset_asset = asset.subset_for(selected_asset_keys, selected_check_subset)
included_assets.add(subset_asset)
# subset of the asset that we don't want
excluded_assets.add(asset.subset_for(asset.keys - subset_asset.keys))
excluded_assets.add(
asset.subset_for(
selected_asset_keys=asset.keys - subset_asset.keys,
selected_asset_check_keys=(asset.check_keys - subset_asset.check_keys),
)
)
else:
raise DagsterInvalidSubsetError(
f"When building job, the AssetsDefinition '{asset.node_def.name}' "
Expand Down
55 changes: 48 additions & 7 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit):
_backfill_policy: Optional[BackfillPolicy]
_code_versions_by_key: Mapping[AssetKey, Optional[str]]
_descriptions_by_key: Mapping[AssetKey, str]
_selected_asset_check_keys: AbstractSet[AssetCheckKey]

def __init__(
self,
Expand All @@ -109,6 +110,7 @@ def __init__(
backfill_policy: Optional[BackfillPolicy] = None,
descriptions_by_key: Optional[Mapping[AssetKey, str]] = None,
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
# if adding new fields, make sure to handle them in the with_attributes, from_graph, and
# get_attributes_dict methods
):
Expand Down Expand Up @@ -254,18 +256,22 @@ def __init__(
backfill_policy, "backfill_policy", BackfillPolicy
)

if selected_asset_keys is None:
if selected_asset_check_keys is None:
self._check_specs_by_output_name = check_specs_by_output_name or {}
else:
self._check_specs_by_output_name = {
output_name: check_spec
for output_name, check_spec in (check_specs_by_output_name or {}).items()
if check_spec.asset_key in selected_asset_keys
if check_spec.key in selected_asset_check_keys
}

self._check_specs_by_handle = {
spec.key: spec for spec in self._check_specs_by_output_name.values()
}
if selected_asset_check_keys is not None:
self._selected_asset_check_keys = selected_asset_check_keys
else:
self._selected_asset_check_keys = self._check_specs_by_handle.keys()

if self._partitions_def is None:
# check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned
Expand Down Expand Up @@ -305,6 +311,7 @@ def dagster_internal_init(
backfill_policy: Optional[BackfillPolicy],
descriptions_by_key: Optional[Mapping[AssetKey, str]],
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
) -> "AssetsDefinition":
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
Expand All @@ -323,6 +330,7 @@ def dagster_internal_init(
backfill_policy=backfill_policy,
descriptions_by_key=descriptions_by_key,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=selected_asset_check_keys,
)

def __call__(self, *args: object, **kwargs: object) -> object:
Expand Down Expand Up @@ -675,6 +683,7 @@ def _from_node(
can_subset=can_subset,
selected_asset_keys=None, # node has no subselection info
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None,
)

@public
Expand Down Expand Up @@ -858,6 +867,16 @@ def check_specs(self) -> Iterable[AssetCheckSpec]:
"""
return self._check_specs_by_output_name.values()

@property
def check_keys(self) -> AbstractSet[AssetCheckKey]:
"""Returns the selected asset checks associated by this AssetsDefinition.
Returns:
AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is
identified by the asset key and the name of the check.
"""
return self._selected_asset_check_keys

def is_asset_executable(self, asset_key: AssetKey) -> bool:
"""Returns True if the asset key is materializable by this AssetsDefinition.
Expand Down Expand Up @@ -1090,12 +1109,17 @@ def _subset_graph_backed_asset(

return get_graph_subset(self.node_def, op_selection)

def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefinition":
"""Create a subset of this AssetsDefinition that will only materialize the assets in the
selected set.
def subset_for(
self,
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
) -> "AssetsDefinition":
"""Create a subset of this AssetsDefinition that will only materialize the assets and checks
in the selected set.
Args:
selected_asset_keys (AbstractSet[AssetKey]): The total set of asset keys
selected_asset_check_keys (AbstractSet[AssetCheckKey]): The selected asset checks
"""
from dagster._core.definitions.graph_definition import GraphDefinition

Expand All @@ -1106,10 +1130,23 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin

# Set of assets within selected_asset_keys which are outputted by this AssetDefinition
asset_subselection = selected_asset_keys & self.keys
if selected_asset_check_keys is None:
# filter to checks that target selected asset keys
asset_check_subselection = {
key for key in self.check_keys if key.asset_key in asset_subselection
}
else:
asset_check_subselection = selected_asset_check_keys & self.check_keys

# Early escape if all assets in AssetsDefinition are selected
if asset_subselection == self.keys:
if asset_subselection == self.keys and asset_check_subselection == self.check_keys:
return self
elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset
check.invariant(
selected_asset_check_keys == self.check_keys,
"Subsetting graph-backed assets with checks is not yet supported",
)

subsetted_node = self._subset_graph_backed_asset(
asset_subselection,
)
Expand Down Expand Up @@ -1155,7 +1192,10 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin
return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))
else:
# multi_asset subsetting
replaced_attributes = dict(selected_asset_keys=asset_subselection)
replaced_attributes = {
"selected_asset_keys": asset_subselection,
"selected_asset_check_keys": asset_check_subselection,
}
return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))

@public
Expand Down Expand Up @@ -1280,6 +1320,7 @@ def get_attributes_dict(self) -> Dict[str, Any]:
backfill_policy=self._backfill_policy,
descriptions_by_key=self._descriptions_by_key,
check_specs_by_output_name=self._check_specs_by_output_name,
selected_asset_check_keys=self._selected_asset_check_keys,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def _dfs(key, cur_color):
ret.append(assets_def)
else:
for asset_keys in color_mapping.values():
ret.append(assets_def.subset_for(asset_keys))
ret.append(assets_def.subset_for(asset_keys, selected_asset_check_keys=None))

return ret

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
metadata_by_key={out_asset_key: self.metadata} if self.metadata else None,
descriptions_by_key=None, # not supported for now
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None, # no subselection in decorator
)


Expand Down Expand Up @@ -848,6 +849,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
descriptions_by_key=None, # not supported for now
metadata_by_key=metadata_by_key,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None, # no subselection in decorator
)

return inner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

import dagster._check as check
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_version import (
DataProvenance,
Expand Down Expand Up @@ -561,6 +562,45 @@ def selected_asset_keys(self) -> AbstractSet[AssetKey]:
return set()
return self.assets_def.keys

@public
@property
def has_asset_checks_def(self) -> bool:
"""Return a boolean indicating the presence of a backing AssetChecksDefinition
for the current execution.
Returns:
bool: True if there is a backing AssetChecksDefinition for the current execution, otherwise False.
"""
return self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle) is not None

@public
@property
def asset_checks_def(self) -> AssetChecksDefinition:
"""The backing AssetChecksDefinition for what is currently executing, errors if not
available.
Returns:
AssetChecksDefinition.
"""
asset_checks_def = self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle)
if asset_checks_def is None:
raise DagsterInvalidPropertyError(
f"Op '{self.op.name}' does not have an asset checks definition."
)

return asset_checks_def

@public
@property
def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
if self.has_assets_def:
return self.assets_def.check_keys

if self.has_asset_checks_def:
check.failed("Subset selection is not yet supported within an AssetChecksDefinition")

return set()

@public
@property
def selected_output_names(self) -> AbstractSet[str]:
Expand Down
Loading

0 comments on commit e69cd3f

Please sign in to comment.