From 46946609719dfa6df1b27bf7eeddfb49572a3d2f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Sep 2023 09:30:01 -0400 Subject: [PATCH] add method impls to asset context --- .../_core/execution/context/compute.py | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 837f92604f65c..68a2c7ac35ecb 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1272,9 +1272,7 @@ def asset_check_spec(self) -> AssetCheckSpec: PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset instead" INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" -OUTPUT_METADATA_ALT = ( # TODO - fix this recommendation, as MaterializationResult does not exist - "return MaterializationResult from the asset instead" -) +OUTPUT_METADATA_ALT = "return MaterializeResult from the asset instead" DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = { "add_output_metadata": OUTPUT_METADATA_ALT, @@ -1300,9 +1298,7 @@ def asset_check_spec(self) -> AssetCheckSpec: "has_tag": "use dagster_run.has_tag instead", "get_tag": "use dagster_run.get_tag instead", "run_tags": "use dagster_run.tags instead", - "set_data_version": ( - "use MaterializationResult instead" - ), # TODO - fix this recommendation, as MaterializationResult does not exist + "set_data_version": "use MaterializeResult instead", } @@ -1386,19 +1382,20 @@ def provenance(self) -> Optional[DataProvenance]: return self.get_asset_provenance(self.asset_key) @property - def provenance_by_asset_key(self) -> Mapping[str, Optional[DataProvenance]]: - """TODO.""" - pass + def provenance_by_asset_key(self) -> Mapping[AssetKey, Optional[DataProvenance]]: + provenance_map = {} + for key in self.asset_keys: + provenance_map[key] = self.get_asset_provenance(key) + + return provenance_map @property def code_version(self) -> Optional[str]: - """TODO.""" - pass + return self.get_assets_code_version([self.asset_key])[self.asset_key] @property - def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: - """TODO.""" - pass + def code_version_by_asset_key(self) -> Mapping[AssetKey, Optional[str]]: + return self.get_assets_code_version(self.asset_keys) @public @property @@ -1460,8 +1457,6 @@ def assets_def(self) -> AssetsDefinition: return self._op_execution_context.assets_def @public - # TODO confirm semantics in the presence of asset subsetting - # seems like there should be both "asset_keys" and "selected_asset_keys" @property def selected_asset_keys(self) -> AbstractSet[AssetKey]: return self._op_execution_context.selected_asset_keys @@ -1469,15 +1464,22 @@ def selected_asset_keys(self) -> AbstractSet[AssetKey]: @public @experimental def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: - # TODO - confirm if this method is necessary with both provenance and - # provenance_by_asset_key return self._op_execution_context.get_asset_provenance(asset_key) + @public + # TODO - method naming. this needs work + def get_assets_code_version( + self, asset_keys: Sequence[AssetKey] + ) -> Mapping[AssetKey, Optional[str]]: + return self.op_execution_context.instance.get_latest_materialization_code_versions( + asset_keys + ) + @property def asset_check_spec(self) -> AssetCheckSpec: return self._op_execution_context.asset_check_spec @public def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange: - """TODO.""" + """TODO - implement in stacked pr.""" pass