Skip to content

Commit

Permalink
add method impls to asset context
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 18, 2023
1 parent 2241dbb commit 4694660
Showing 1 changed file with 21 additions and 19 deletions.
40 changes: 21 additions & 19 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1272,9 +1272,7 @@ def asset_check_spec(self) -> AssetCheckSpec:

PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset instead"
INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly"
OUTPUT_METADATA_ALT = ( # TODO - fix this recommendation, as MaterializationResult does not exist
"return MaterializationResult from the asset instead"
)
OUTPUT_METADATA_ALT = "return MaterializeResult from the asset instead"

DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = {
"add_output_metadata": OUTPUT_METADATA_ALT,
Expand All @@ -1300,9 +1298,7 @@ def asset_check_spec(self) -> AssetCheckSpec:
"has_tag": "use dagster_run.has_tag instead",
"get_tag": "use dagster_run.get_tag instead",
"run_tags": "use dagster_run.tags instead",
"set_data_version": (
"use MaterializationResult instead"
), # TODO - fix this recommendation, as MaterializationResult does not exist
"set_data_version": "use MaterializeResult instead",
}


Expand Down Expand Up @@ -1386,19 +1382,20 @@ def provenance(self) -> Optional[DataProvenance]:
return self.get_asset_provenance(self.asset_key)

@property
def provenance_by_asset_key(self) -> Mapping[str, Optional[DataProvenance]]:
"""TODO."""
pass
def provenance_by_asset_key(self) -> Mapping[AssetKey, Optional[DataProvenance]]:
provenance_map = {}
for key in self.asset_keys:
provenance_map[key] = self.get_asset_provenance(key)

return provenance_map

@property
def code_version(self) -> Optional[str]:
"""TODO."""
pass
return self.get_assets_code_version([self.asset_key])[self.asset_key]

@property
def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]:
"""TODO."""
pass
def code_version_by_asset_key(self) -> Mapping[AssetKey, Optional[str]]:
return self.get_assets_code_version(self.asset_keys)

@public
@property
Expand Down Expand Up @@ -1460,24 +1457,29 @@ def assets_def(self) -> AssetsDefinition:
return self._op_execution_context.assets_def

@public
# TODO confirm semantics in the presence of asset subsetting
# seems like there should be both "asset_keys" and "selected_asset_keys"
@property
def selected_asset_keys(self) -> AbstractSet[AssetKey]:
return self._op_execution_context.selected_asset_keys

@public
@experimental
def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]:
# TODO - confirm if this method is necessary with both provenance and
# provenance_by_asset_key
return self._op_execution_context.get_asset_provenance(asset_key)

@public
# TODO - method naming. this needs work
def get_assets_code_version(
self, asset_keys: Sequence[AssetKey]
) -> Mapping[AssetKey, Optional[str]]:
return self.op_execution_context.instance.get_latest_materialization_code_versions(
asset_keys
)

@property
def asset_check_spec(self) -> AssetCheckSpec:
return self._op_execution_context.asset_check_spec

@public
def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange:
"""TODO."""
"""TODO - implement in stacked pr."""
pass

0 comments on commit 4694660

Please sign in to comment.