Skip to content

Commit

Permalink
[external-assets] Rename AssetLayer.asset_keys, rm has_assets_defs (#…
Browse files Browse the repository at this point in the history
…20473)

## Summary & Motivation

- Rename `AssetLayer.asset_keys` to `executable_asset_keys`. This
matches the meaning used in the `AssetGraph`.
- Remove `has_assets_defs` as this method no longer does what it was
intended to do (check for executable assets).

## How I Tested These Changes

Existing test suite.
  • Loading branch information
smackesey authored Mar 14, 2024
1 parent 5fad20d commit f51062f
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def empty_dataframe_from_column_schema(column_schema: TableSchema) -> DataFrame:

class SmokeIOManager(InMemoryIOManager):
def load_input(self, context):
if context.asset_key not in context.step_context.job_def.asset_layer.asset_keys:
if context.asset_key not in context.step_context.job_def.asset_layer.executable_asset_keys:
column_schema = context.upstream_output.metadata["column_schema"]
return empty_dataframe_from_column_schema(column_schema)
else:
Expand Down
16 changes: 6 additions & 10 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,12 @@ def downstream_assets_for_asset(self, asset_key: AssetKey) -> AbstractSet[AssetK
return {k for k, v in self.asset_deps.items() if asset_key in v}

@property
def asset_keys(self) -> Iterable[AssetKey]:
return self.dependency_node_handles_by_asset_key.keys()
def all_asset_keys(self) -> Iterable[AssetKey]:
return set(self.assets_defs_by_key.keys())

@property
def has_assets_defs(self) -> bool:
return len(self.assets_defs_by_key) > 0
def executable_asset_keys(self) -> Iterable[AssetKey]:
return self.dependency_node_handles_by_asset_key.keys()

@property
def assets_defs(self) -> Set["AssetsDefinition"]:
Expand Down Expand Up @@ -760,12 +760,8 @@ def asset_check_key_for_output(
) -> Optional[AssetCheckKey]:
return self.check_key_by_node_output_handle.get(NodeOutputHandle(node_handle, output_name))

def group_names_by_assets(self) -> Mapping[AssetKey, str]:
return {
key: assets_def.group_names_by_key[key]
for key, assets_def in self.assets_defs_by_key.items()
if key in assets_def.group_names_by_key
}
def group_name_for_asset(self, asset_key: AssetKey) -> str:
return self.assets_defs_by_key[asset_key].group_names_by_key[asset_key]

def partitions_def_for_asset(self, asset_key: AssetKey) -> Optional["PartitionsDefinition"]:
assets_def = self.assets_defs_by_key.get(asset_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,9 @@ def _get_job_def_for_asset_selection(
check.opt_set_param(asset_check_selection, "asset_check_selection", AssetCheckKey)

nonexistent_assets = [
asset for asset in asset_selection if asset not in self.asset_layer.asset_keys
asset
for asset in asset_selection
if asset not in self.asset_layer.executable_asset_keys
]
nonexistent_asset_strings = [
asset_str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ def get_inputs_field(
if inp.input_manager_key:
input_field = get_input_manager_input_field(node, inp, resource_defs)
elif (
# if you have asset definitions, input will be loaded from the source asset
asset_layer.has_assets_defs
# if you have executable assets defs, input will be loaded from the source asset
asset_layer.executable_asset_keys
or asset_layer.has_asset_check_defs
and asset_layer.asset_key_for_input(handle, name)
and not has_upstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ def _get_output_asset_events(
if (
execution_type == AssetExecutionType.MATERIALIZATION
and step_context.is_external_input_asset_version_info_loaded
and asset_key in step_context.job_def.asset_layer.asset_keys
and asset_key in step_context.job_def.asset_layer.executable_asset_keys
):
assert isinstance(output, Output)
code_version = _get_code_version(asset_key, step_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,9 @@ def external_asset_nodes_from_defs(
if len(assets_def.keys) == 1 and assets_def.check_keys and not assets_def.can_subset:
execution_set_identifiers[assets_def.key] = assets_def.unique_id

group_name_by_asset_key.update(asset_layer.group_names_by_assets())
group_name_by_asset_key.update(
{k: asset_layer.group_name_for_asset(k) for k in asset_layer.all_asset_keys}
)

asset_nodes: List[ExternalAssetNode] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2081,7 +2081,7 @@ def asset1(): ...

assert Definitions(
assets=[source_asset, asset1], jobs=[define_asset_job("something", selection="abc/asset1")]
).get_job_def("something").asset_layer.asset_keys == {AssetKey(["abc", "asset1"])}
).get_job_def("something").asset_layer.executable_asset_keys == {AssetKey(["abc", "asset1"])}


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def asset3(): ...
all_assets = [asset1, asset2, asset3]

job1 = create_test_asset_job(all_assets, selection=[asset1, asset2])
asset_keys = list(job1.asset_layer.asset_keys)
asset_keys = list(job1.asset_layer.executable_asset_keys)
assert len(asset_keys) == 2
assert set(asset_keys) == {asset1.key, asset2.key}
job1.execute_in_process()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def my_repo():
return [foo, asset1, asset2]

assert len(my_repo.get_all_jobs()) == 1
assert set(my_repo.get_all_jobs()[0].asset_layer.asset_keys) == {
assert set(my_repo.get_all_jobs()[0].asset_layer.executable_asset_keys) == {
AssetKey(["asset1"]),
AssetKey(["asset2"]),
}
Expand Down Expand Up @@ -1192,7 +1192,7 @@ def assets_repo():
return [all_assets]

assert len(assets_repo.get_all_jobs()) == 1
assert set(assets_repo.get_all_jobs()[0].asset_layer.asset_keys) == {
assert set(assets_repo.get_all_jobs()[0].asset_layer.executable_asset_keys) == {
AssetKey(["asset1"]),
AssetKey(["asset2"]),
}
Expand Down Expand Up @@ -1240,7 +1240,7 @@ def combo_repo():
return [combo_list]

assert len(combo_repo.get_all_jobs()) == 2
assert set(combo_repo.get_all_jobs()[0].asset_layer.asset_keys) == {
assert set(combo_repo.get_all_jobs()[0].asset_layer.executable_asset_keys) == {
AssetKey(["asset3"]),
}

Expand Down Expand Up @@ -1418,7 +1418,7 @@ def repo():
assert sorted(repo.get_implicit_asset_job_names()) == ["__ASSET_JOB_0", "__ASSET_JOB_1"]
assert repo.get_implicit_job_def_for_assets(
[asset1.key, asset2.key]
).asset_layer.asset_keys == {
).asset_layer.executable_asset_keys == {
asset1.key,
asset2.key,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def yield_result(self, value, output_name="result"):
# Note: yield_result currently does not support DynamicOutput

# dagstermill assets do not support yielding additional results within the notebook:
if len(step_context.job_def.asset_layer.asset_keys) > 0:
if len(step_context.job_def.asset_layer.executable_asset_keys) > 0:
raise DagstermillError(
"dagstermill assets do not currently support dagstermill.yield_result"
)
Expand Down

0 comments on commit f51062f

Please sign in to comment.