From bdf4d781b1b0d5655176e5869b83a24fbb9a9ae7 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 13 Sep 2023 13:10:16 -0400 Subject: [PATCH 01/31] start interface to begin discussion --- .../dagster-ext/dagster_ext/__init__.py | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster-ext/dagster_ext/__init__.py b/python_modules/dagster-ext/dagster_ext/__init__.py index 38366a039aeaf..f99ceb9463f06 100644 --- a/python_modules/dagster-ext/dagster_ext/__init__.py +++ b/python_modules/dagster-ext/dagster_ext/__init__.py @@ -621,6 +621,80 @@ def make_channel( # ######################## +class IContext(ABC): + """Base class for asset context implemented by AssetExecutionContext and ExtContext.""" + + @property + @abstractmethod + def is_asset_step(self) -> bool: + """TODO.""" + + @property + @abstractmethod + def asset_key(self) -> str: + """TODO.""" + + @property + @abstractmethod + def asset_keys(self) -> Sequence[str]: + """TODO.""" + + @property + @abstractmethod + def provenance(self) -> Optional[ExtDataProvenance]: + """TODO.""" + + @property + @abstractmethod + def provenance_by_asset_key(self) -> Mapping[str, Optional[ExtDataProvenance]]: + """TODO.""" + + @property + @abstractmethod + def code_version(self) -> Optional[str]: + """TODO.""" + + @property + @abstractmethod + def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: + """TODO.""" + + @property + @abstractmethod + def is_partition_step(self) -> bool: + """TODO.""" + + @property + @abstractmethod + def partition_key(self) -> str: + """TODO.""" + + @property + @abstractmethod + def partition_key_range(self) -> Optional["ExtPartitionKeyRange"]: + """TODO.""" + + @property + @abstractmethod + def partition_time_window(self) -> Optional["ExtTimeWindow"]: + """TODO.""" + + @property + @abstractmethod + def run_id(self) -> str: + """TODO.""" + + @property + @abstractmethod + def job_name(self) -> Optional[str]: + """TODO.""" + + @property + @abstractmethod + def retry_number(self) -> int: + """TODO.""" + + def init_dagster_ext( *, context_loader: Optional[ExtContextLoader] = None, @@ -648,7 +722,7 @@ def init_dagster_ext( return context -class ExtContext: +class ExtContext(IContext): _instance: ClassVar[Optional["ExtContext"]] = None @classmethod From 712da7d6d28f30f69bc3d1052e2999abd2507c2d Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 16:02:30 -0400 Subject: [PATCH 02/31] revert trying to install dagster ext --- python_modules/dagster/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/tox.ini b/python_modules/dagster/tox.ini index f12869179d583..881f50bf2cdef 100644 --- a/python_modules/dagster/tox.ini +++ b/python_modules/dagster/tox.ini @@ -16,7 +16,7 @@ deps = general_tests_old_protobuf: protobuf<4 -e ../dagster-test -e .[mypy,test,pyright] - -e ../dagster-ext + ; -e ../dagster-ext allowlist_externals = /bin/bash commands = From 1a840051710658de6540f147cc784e2a5344c97f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 17:02:17 -0400 Subject: [PATCH 03/31] dagster ext dep attempt 2 --- python_modules/dagster/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/tox.ini b/python_modules/dagster/tox.ini index 881f50bf2cdef..f12869179d583 100644 --- a/python_modules/dagster/tox.ini +++ b/python_modules/dagster/tox.ini @@ -16,7 +16,7 @@ deps = general_tests_old_protobuf: protobuf<4 -e ../dagster-test -e .[mypy,test,pyright] - ; -e ../dagster-ext + -e ../dagster-ext allowlist_externals = /bin/bash commands = From e77fce0acebd54a987396bc38402189e9fcfb6ed Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 17:05:14 -0400 Subject: [PATCH 04/31] revert --- python_modules/dagster/setup.py | 8 ++++---- python_modules/dagster/tox.ini | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/setup.py b/python_modules/dagster/setup.py index 1f1df15267ae8..300febbe19675 100644 --- a/python_modules/dagster/setup.py +++ b/python_modules/dagster/setup.py @@ -28,9 +28,9 @@ def get_version() -> str: # grpcio 1.44.0 is the min version compatible with both protobuf 3 and 4 GRPC_VERSION_FLOOR = "1.44.0" -ver = get_version() -# dont pin dev installs to avoid pip dep resolver issues -pin = "" if ver == "1!0+dev" else f"=={ver}" +# ver = get_version() +# # dont pin dev installs to avoid pip dep resolver issues +# pin = "" if ver == "1!0+dev" else f"=={ver}" setup( name="dagster", @@ -107,7 +107,7 @@ def get_version() -> str: "universal_pathlib", # https://github.com/pydantic/pydantic/issues/5821 "pydantic != 1.10.7,<2.0.0", - f"dagster-ext-process{pin}", + # f"dagster-ext-process{pin}", ], extras_require={ "docker": ["docker"], diff --git a/python_modules/dagster/tox.ini b/python_modules/dagster/tox.ini index f12869179d583..881f50bf2cdef 100644 --- a/python_modules/dagster/tox.ini +++ b/python_modules/dagster/tox.ini @@ -16,7 +16,7 @@ deps = general_tests_old_protobuf: protobuf<4 -e ../dagster-test -e .[mypy,test,pyright] - -e ../dagster-ext + ; -e ../dagster-ext allowlist_externals = /bin/bash commands = From 9b7a1872ca3955b42a2d39c135f018eac10a2425 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 08:57:50 -0400 Subject: [PATCH 05/31] pr comments, cleanup --- .../dagster-ext/dagster_ext/__init__.py | 66 ++++++++----------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/python_modules/dagster-ext/dagster_ext/__init__.py b/python_modules/dagster-ext/dagster_ext/__init__.py index f99ceb9463f06..5b47d8477dad6 100644 --- a/python_modules/dagster-ext/dagster_ext/__init__.py +++ b/python_modules/dagster-ext/dagster_ext/__init__.py @@ -626,73 +626,63 @@ class IContext(ABC): @property @abstractmethod - def is_asset_step(self) -> bool: - """TODO.""" - - @property - @abstractmethod - def asset_key(self) -> str: - """TODO.""" - - @property - @abstractmethod - def asset_keys(self) -> Sequence[str]: - """TODO.""" - - @property - @abstractmethod - def provenance(self) -> Optional[ExtDataProvenance]: - """TODO.""" + def asset_key(self): + """The AssetKey for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ @property @abstractmethod - def provenance_by_asset_key(self) -> Mapping[str, Optional[ExtDataProvenance]]: - """TODO.""" + def asset_keys(self): + """The AssetKeys for the asset being materialized. If no asset is being materialized, errors.""" @property @abstractmethod - def code_version(self) -> Optional[str]: - """TODO.""" + def provenance(self): + """The data provenance for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ @property @abstractmethod - def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: - """TODO.""" + def provenance_by_asset_key(self): + """A dictionary of data provenance for the assets being materialized, keyed by asset key. + If no asset is being materialized, errors. + """ @property @abstractmethod - def is_partition_step(self) -> bool: - """TODO.""" + def code_version(self): + """The code version for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ @property @abstractmethod - def partition_key(self) -> str: - """TODO.""" + def code_version_by_asset_key(self): + """A dictionary of code versions for the assets being materialized, keyed by asset key. + If no asset is being materialized, errors. + """ @property @abstractmethod - def partition_key_range(self) -> Optional["ExtPartitionKeyRange"]: - """TODO.""" - - @property - @abstractmethod - def partition_time_window(self) -> Optional["ExtTimeWindow"]: - """TODO.""" + def is_partitioned(self) -> bool: + """True if the current execution is partitioned.""" @property @abstractmethod def run_id(self) -> str: - """TODO.""" + """The run id of the current execution.""" @property @abstractmethod def job_name(self) -> Optional[str]: - """TODO.""" + """The name of the job that is executing.""" @property @abstractmethod def retry_number(self) -> int: - """TODO.""" + """The number of retries of this execution.""" def init_dagster_ext( @@ -804,7 +794,7 @@ def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: return code_version_by_asset_key @property - def is_partition_step(self) -> bool: + def is_partitioned(self) -> bool: return self._data["partition_key_range"] is not None @property From 819d90cdcf9052eab0cb48786e3f6c05f84a4db3 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 10:17:40 -0400 Subject: [PATCH 06/31] test fix --- .../dagster-ext/dagster_ext_tests/test_context.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster-ext/dagster_ext_tests/test_context.py b/python_modules/dagster-ext/dagster_ext_tests/test_context.py index 4294e53bbeda7..aff0318df025f 100644 --- a/python_modules/dagster-ext/dagster_ext_tests/test_context.py +++ b/python_modules/dagster-ext/dagster_ext_tests/test_context.py @@ -120,7 +120,7 @@ def test_multi_asset_context(): def test_no_partition_context(): context = _make_external_execution_context() - assert not context.is_partition_step + assert not context.is_partitioned _assert_undefined(context, "partition_key") _assert_undefined(context, "partition_key_range") _assert_undefined(context, "partition_time_window") @@ -135,7 +135,7 @@ def test_single_partition_context(): partition_time_window=None, ) - assert context.is_partition_step + assert context.is_partitioned assert context.partition_key == "foo" assert context.partition_key_range == partition_key_range assert context.partition_time_window is None @@ -151,7 +151,7 @@ def test_multiple_partition_context(): partition_time_window=time_window, ) - assert context.is_partition_step + assert context.is_partitioned _assert_undefined(context, "partition_key") assert context.partition_key_range == partition_key_range assert context.partition_time_window == time_window From c278d0a8e18c343f1310a37de672f7f2ee79ca2b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 10:24:42 -0400 Subject: [PATCH 07/31] fix conflict --- python_modules/dagster/setup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/setup.py b/python_modules/dagster/setup.py index 300febbe19675..1f1df15267ae8 100644 --- a/python_modules/dagster/setup.py +++ b/python_modules/dagster/setup.py @@ -28,9 +28,9 @@ def get_version() -> str: # grpcio 1.44.0 is the min version compatible with both protobuf 3 and 4 GRPC_VERSION_FLOOR = "1.44.0" -# ver = get_version() -# # dont pin dev installs to avoid pip dep resolver issues -# pin = "" if ver == "1!0+dev" else f"=={ver}" +ver = get_version() +# dont pin dev installs to avoid pip dep resolver issues +pin = "" if ver == "1!0+dev" else f"=={ver}" setup( name="dagster", @@ -107,7 +107,7 @@ def get_version() -> str: "universal_pathlib", # https://github.com/pydantic/pydantic/issues/5821 "pydantic != 1.10.7,<2.0.0", - # f"dagster-ext-process{pin}", + f"dagster-ext-process{pin}", ], extras_require={ "docker": ["docker"], From 65b6c1e4456f80827c1ac863736a577fc0a04fbe Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 10:25:26 -0400 Subject: [PATCH 08/31] fix conflixt --- python_modules/dagster/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/tox.ini b/python_modules/dagster/tox.ini index 881f50bf2cdef..f12869179d583 100644 --- a/python_modules/dagster/tox.ini +++ b/python_modules/dagster/tox.ini @@ -16,7 +16,7 @@ deps = general_tests_old_protobuf: protobuf<4 -e ../dagster-test -e .[mypy,test,pyright] - ; -e ../dagster-ext + -e ../dagster-ext allowlist_externals = /bin/bash commands = From b89018efe36ddea410f23318ac6f578f2f576b25 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 18 Sep 2023 12:26:17 -0400 Subject: [PATCH 09/31] subclass the context --- .../_core/execution/context/compute.py | 472 +++++++++++++++++- 1 file changed, 462 insertions(+), 10 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d76c79afe3a8d..40dd4dd53dd76 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import ABC, ABCMeta, abstractmethod from typing import ( AbstractSet, Any, @@ -12,8 +12,6 @@ cast, ) -from typing_extensions import TypeAlias - import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec @@ -46,11 +44,17 @@ from dagster._core.log_manager import DagsterLogManager from dagster._core.storage.dagster_run import DagsterRun from dagster._utils.forked_pdb import ForkedPdb +from dagster._utils.warnings import deprecation_warning from .system import StepExecutionContext -class AbstractComputeExecutionContext(ABC): +# This metaclass has to exist for OpExecutionContext to have a metaclass +class AbstractComputeMetaclass(ABCMeta): + pass + + +class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): """Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext.""" @abstractmethod @@ -97,7 +101,25 @@ def op_config(self) -> Any: """The parsed config specific to this op.""" -class OpExecutionContext(AbstractComputeExecutionContext): +class OpExecutionContextMetaClass(AbstractComputeMetaclass): + def __instancecheck__(cls, instance) -> bool: + # This makes isinstance(context, OpExecutionContext) throw a deprecation warning when + # context is an AssetExecutionContext. This metaclass can be deleted once AssetExecutionContext + # has been split into it's own class in 1.7.0 + if isinstance(instance, AssetExecutionContext): + deprecation_warning( + subject="AssetExecutionContext", + additional_warn_text=( + "Starting in version 1.7.0 AssetExecutionContext will no longer be a subclass" + " of OpExecutionContext." + ), + breaking_version="1.7.0", + stacklevel=1, + ) + return super().__instancecheck__(instance) + + +class OpExecutionContext(AbstractComputeExecutionContext, metaclass=OpExecutionContextMetaClass): """The ``context`` object that can be made available as the first argument to the function used for computing an op or asset. @@ -1223,8 +1245,438 @@ def asset_check_spec(self) -> AssetCheckSpec: return asset_checks_def.spec -# actually forking the object type for assets is tricky for users in the cases of: -# * manually constructing ops to make AssetsDefinitions -# * having ops in a graph that form a graph backed asset -# so we have a single type that users can call by their preferred name where appropriate -AssetExecutionContext: TypeAlias = OpExecutionContext +############################ +##### AssetExecutionContext +############################ + +# To preserve backwards compatibility, AssetExecutionContext is being written as a subclass of +# OpExecutionContext until we can split it into its own class. All methods on OpExecutionContext +# that will not be included in the eventual AssetExecutionContext will be marked with deprecation +# warnings according to how the user should access that functionality in the future +# +# The following sets/maps are used to determine which methods need deprecation warnings, and how to +# direct users to the correct method to use + + +OP_EXECUTION_CONTEXT_ONLY_METHODS = set( + [ + "describe_op", + "file_manager", + "has_assets_def", + "get_mapping_key", + # "get_step_execution_context", # used by internals + "job_def", + "node_handle", + "op", + "op_config", + # "op_def", # used by internals + "op_handle", + "step_launcher", + # "has_events", # used by internals + "consume_events", + ] +) + + +PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset instead" +INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" +OUTPUT_METADATA_ALT = "return MaterializeResult from the asset instead" + +DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = { + "add_output_metadata": OUTPUT_METADATA_ALT, + "asset_key_for_input": INPUT_OUTPUT_ALT, + "asset_key_for_output": INPUT_OUTPUT_ALT, + "asset_partition_key_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_output": PARTITION_KEY_RANGE_AS_ALT, + "get_output_metadata": "use op_execution_context.op_def.get_output(...).metadata", + # "merge_output_metadata": OUTPUT_METADATA_ALT, # TODO - this method doesn't exist, check if it has a different name + "output_for_asset_key": INPUT_OUTPUT_ALT, + "selected_output_names": INPUT_OUTPUT_ALT, +} + +ALTERNATE_AVAILABLE_METHODS = { + "has_tag": ( + "use dagster_run.has_tag instead" + ), # TODO - was dagster_run intended to be a method/attr on AssetExecutionContext? + "get_tag": "use dagster_run.get_tag instead", + "run_tags": "use dagster_run.tags instead", + "set_data_version": "use MaterializeResult instead", +} + +# TODO - add AssetCheck related methods to this list + + +def _get_deprecation_kwargs(attr: str): + deprecation_kwargs = {"breaking_version": "1.7.0"} + deprecation_kwargs["subject"] = f"AssetExecutionContext.{attr}" + + if attr in OP_EXECUTION_CONTEXT_ONLY_METHODS: + deprecation_kwargs["additional_warn_text"] = ( + f"You have called the deprecated method {attr} on AssetExecutionContext. Use" + " the underlying OpExecutionContext instead by calling" + f" context.op_execution_context.{attr}." + ) + + if attr in DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS: + alt = DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS[attr] + deprecation_kwargs["additional_warn_text"] = ( + f"You have called method {attr} on AssetExecutionContext that is oriented" + f" around I/O managers. If you not using I/O managers we suggest you {alt}. If" + " you are using I/O managers the method still exists at" + f" context.op_execution_context.{attr}." + ) + + if attr in ALTERNATE_AVAILABLE_METHODS: + deprecation_kwargs["additional_warn_text"] = f"Instead {ALTERNATE_AVAILABLE_METHODS[attr]}." + + return deprecation_kwargs + + +class AssetExecutionContext(OpExecutionContext): + def __init__(self, op_execution_context: OpExecutionContext) -> None: + self._op_execution_context = check.inst_param( + op_execution_context, "op_execution_context", OpExecutionContext + ) + self._step_execution_context = ( + self._op_execution_context._step_execution_context # noqa: SLF001 + ) + + @public + @property + def op_execution_context(self) -> OpExecutionContext: + return self._op_execution_context + + # IContext interface methods + + @property + def is_asset_step(self) -> bool: + return self.op_execution_context.has_assets_def + + @public + @property + def asset_key(self) -> AssetKey: + return self._op_execution_context.asset_key + + @property + def asset_keys(self) -> Sequence[AssetKey]: + return list(self.op_execution_context.assets_def.keys_by_output_name.values()) + + @property + def provenance(self) -> Optional[DataProvenance]: + return self.get_asset_provenance(self.asset_key) + + @property + def provenance_by_asset_key(self) -> Mapping[AssetKey, Optional[DataProvenance]]: + provenance_map = {} + for key in self.asset_keys: + provenance_map[key] = self.get_asset_provenance(key) + + return provenance_map + + @property + def code_version(self) -> Optional[str]: + return self.get_assets_code_version([self.asset_key])[self.asset_key] + + @property + def code_version_by_asset_key(self) -> Mapping[AssetKey, Optional[str]]: + return self.get_assets_code_version(self.asset_keys) + + @public + @property + def is_partition_step(self) -> bool: + return self._op_execution_context.has_partition_key + + @property + def partition_key(self) -> str: + return self.op_execution_context.partition_key + + @public + @property + def partition_key_range(self) -> PartitionKeyRange: + return self._op_execution_context.asset_partition_key_range + + @property + def partition_time_window(self) -> TimeWindow: + return self.op_execution_context.partition_time_window + + @public + @property + def run_id(self) -> str: + return self._op_execution_context.run_id + + @property + def job_name(self) -> Optional[str]: + return self.op_execution_context.job_name + + @property + def retry_number(self) -> int: + return self.op_execution_context.retry_number + + # Additional methods + + @public + @property + def dagster_run(self) -> DagsterRun: + """PipelineRun: The current pipeline run.""" + return self._step_execution_context.dagster_run + + @public + @property + def pdb(self) -> ForkedPdb: + return self._op_execution_context.pdb + + @public + @property + def log(self) -> DagsterLogManager: + """DagsterLogManager: The log manager available in the execution context.""" + return self._op_execution_context.log + + @public + def log_event(self, event: UserEvent) -> None: + return self._op_execution_context.log_event(event) + + @public + @property + def assets_def(self) -> AssetsDefinition: + return self._op_execution_context.assets_def + + @public + @property + def selected_asset_keys(self) -> AbstractSet[AssetKey]: + return self._op_execution_context.selected_asset_keys + + @public + @experimental + def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: + return self._op_execution_context.get_asset_provenance(asset_key) + + @public + # TODO - method naming. this needs work + def get_assets_code_version( + self, asset_keys: Sequence[AssetKey] + ) -> Mapping[AssetKey, Optional[str]]: + return self.op_execution_context.instance.get_latest_materialization_code_versions( + asset_keys + ) + + @property + def asset_check_spec(self) -> AssetCheckSpec: + return self._op_execution_context.asset_check_spec + + @public + def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange: + """TODO - implement in stacked pr.""" + pass + + # deprecated methods. All remaining methods on OpExecutionContext should be here with the + # appropriate deprecation warning + + @deprecated( + **_get_deprecation_kwargs("op_config"), + ) + @public + @property + def op_config(self) -> Any: + return super().op_config + + @deprecated( + **_get_deprecation_kwargs("file_manager"), + ) + @property + def file_manager(self): + return super().file_manager + + @deprecated( + **_get_deprecation_kwargs("has_assets_def"), + ) + @public + @property + def has_assets_def(self) -> bool: + return super().has_assets_def + + @deprecated( + **_get_deprecation_kwargs("get_mapping_key"), + ) + @public + def get_mapping_key(self) -> Optional[str]: + return super().get_mapping_key() + + @deprecated( + **_get_deprecation_kwargs("job_def"), + ) + @public + @property + def job_def(self) -> JobDefinition: + return super().job_def + + @deprecated( + **_get_deprecation_kwargs("node_handle"), + ) + @property + def node_handle(self) -> NodeHandle: + return super().node_handle + + @deprecated( + **_get_deprecation_kwargs("op"), + ) + @property + def op(self) -> Node: + return super().op + + @deprecated( + **_get_deprecation_kwargs("describe_op"), + ) + def describe_op(self): + return super().describe_op() + + @deprecated( + **_get_deprecation_kwargs("op_handle"), + ) + @property + def op_handle(self) -> NodeHandle: + return super().op_handle + + @deprecated( + **_get_deprecation_kwargs("step_launcher"), + ) + @property + def step_launcher(self) -> Optional[StepLauncher]: + return super().step_launcher + + @deprecated( + **_get_deprecation_kwargs("consume_events"), + ) + def consume_events(self) -> Iterator[DagsterEvent]: + return super().consume_events() + + @deprecated( + **_get_deprecation_kwargs("add_output_metadata"), + ) + @public + def add_output_metadata( + self, + metadata: Mapping[str, Any], + output_name: Optional[str] = None, + mapping_key: Optional[str] = None, + ) -> None: + return super().add_output_metadata( + metadata=metadata, output_name=output_name, mapping_key=mapping_key + ) + + @deprecated( + **_get_deprecation_kwargs("asset_key_for_input"), + ) + @public + def asset_key_for_input(self, input_name: str) -> AssetKey: + return super().asset_key_for_input(input_name=input_name) + + @deprecated( + **_get_deprecation_kwargs("asset_key_for_output"), + ) + @public + def asset_key_for_output(self, output_name: str = "result") -> AssetKey: + return super().asset_key_for_output(output_name=output_name) + + @deprecated( + **_get_deprecation_kwargs("asset_partition_key_for_input"), + ) + @public + def asset_partition_key_for_input(self, input_name: str) -> str: + return super().asset_partition_key_for_input(input_name=input_name) + + @deprecated( + **_get_deprecation_kwargs("asset_partition_key_for_output"), + ) + @public + def asset_partition_key_for_output(self, output_name: str = "result") -> str: + return super().asset_partition_key_for_output(output_name=output_name) + + @deprecated( + **_get_deprecation_kwargs("asset_partition_key_range_for_input"), + ) + @public + def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: + return super().asset_partition_key_range_for_input(input_name=input_name) + + @deprecated( + **_get_deprecation_kwargs("asset_partition_key_range_for_output"), + ) + @public + def asset_partition_key_range_for_output( + self, output_name: str = "result" + ) -> PartitionKeyRange: + return super().asset_partition_key_range_for_output(output_name=output_name) + + @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_input")) + @public + def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: + return super().asset_partition_keys_for_input(input_name=input_name) + + @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_output")) + @public + def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: + return super().asset_partition_keys_for_output(output_name=output_name) + + @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_input")) + @public + def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: + return super().asset_partitions_time_window_for_input(input_name=input_name) + + @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_output")) + @public + def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: + return super().asset_partitions_time_window_for_output(output_name=output_name) + + @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_input")) + @public + def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: + return super().asset_partitions_def_for_input(input_name=input_name) + + @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_output")) + @public + def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: + return super().asset_partitions_def_for_output(output_name=output_name) + + @deprecated(**_get_deprecation_kwargs("get_output_metadata")) + def get_output_metadata( + self, output_name: str, mapping_key: Optional[str] = None + ) -> Optional[Mapping[str, Any]]: + return super().get_output_metadata(output_name=output_name, mapping_key=mapping_key) + + @deprecated(**_get_deprecation_kwargs("output_for_asset_key")) + @public + def output_for_asset_key(self, asset_key: AssetKey) -> str: + return super().output_for_asset_key(asset_key=asset_key) + + @deprecated(**_get_deprecation_kwargs("selected_output_names")) + @public + @property + def selected_output_names(self) -> AbstractSet[str]: + return super().selected_output_names + + @deprecated(**_get_deprecation_kwargs("has_tag")) + @public + def has_tag(self, key: str) -> bool: + return super().has_tag(key=key) + + @deprecated(**_get_deprecation_kwargs("get_tag")) + @public + def get_tag(self, key: str) -> Optional[str]: + return super().get_tag(key=key) + + @deprecated(**_get_deprecation_kwargs("run_tags")) + @property + def run_tags(self) -> Mapping[str, str]: + return super().run_tags + + @deprecated(**_get_deprecation_kwargs("set_data_version")) + def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None: + return super().set_data_version(asset_key=asset_key, data_version=data_version) From 09c0e8468c64dfc1f67c4fd66a6a2df97ba98171 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 18 Sep 2023 12:28:24 -0400 Subject: [PATCH 10/31] collapse lines --- .../dagster/dagster/_core/execution/context/compute.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 40dd4dd53dd76..30c999f8fb67a 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1479,17 +1479,13 @@ def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKey # deprecated methods. All remaining methods on OpExecutionContext should be here with the # appropriate deprecation warning - @deprecated( - **_get_deprecation_kwargs("op_config"), - ) + @deprecated(**_get_deprecation_kwargs("op_config")) @public @property def op_config(self) -> Any: return super().op_config - @deprecated( - **_get_deprecation_kwargs("file_manager"), - ) + @deprecated(**_get_deprecation_kwargs("file_manager")) @property def file_manager(self): return super().file_manager From c289ff9a8c42819fc62d509c0c18f9c9dfb98f66 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 18 Sep 2023 12:29:58 -0400 Subject: [PATCH 11/31] lines --- .../_core/execution/context/compute.py | 64 +++++-------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 30c999f8fb67a..6ee9d3c98ffe7 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1490,72 +1490,52 @@ def op_config(self) -> Any: def file_manager(self): return super().file_manager - @deprecated( - **_get_deprecation_kwargs("has_assets_def"), - ) + @deprecated(**_get_deprecation_kwargs("has_assets_def")) @public @property def has_assets_def(self) -> bool: return super().has_assets_def - @deprecated( - **_get_deprecation_kwargs("get_mapping_key"), - ) + @deprecated(**_get_deprecation_kwargs("get_mapping_key")) @public def get_mapping_key(self) -> Optional[str]: return super().get_mapping_key() - @deprecated( - **_get_deprecation_kwargs("job_def"), - ) + @deprecated(**_get_deprecation_kwargs("job_def")) @public @property def job_def(self) -> JobDefinition: return super().job_def - @deprecated( - **_get_deprecation_kwargs("node_handle"), - ) + @deprecated(**_get_deprecation_kwargs("node_handle")) @property def node_handle(self) -> NodeHandle: return super().node_handle - @deprecated( - **_get_deprecation_kwargs("op"), - ) + @deprecated(**_get_deprecation_kwargs("op")) @property def op(self) -> Node: return super().op - @deprecated( - **_get_deprecation_kwargs("describe_op"), - ) + @deprecated(**_get_deprecation_kwargs("describe_op")) def describe_op(self): return super().describe_op() - @deprecated( - **_get_deprecation_kwargs("op_handle"), - ) + @deprecated(**_get_deprecation_kwargs("op_handle")) @property def op_handle(self) -> NodeHandle: return super().op_handle - @deprecated( - **_get_deprecation_kwargs("step_launcher"), - ) + @deprecated(**_get_deprecation_kwargs("step_launcher")) @property def step_launcher(self) -> Optional[StepLauncher]: return super().step_launcher - @deprecated( - **_get_deprecation_kwargs("consume_events"), - ) + @deprecated(**_get_deprecation_kwargs("consume_events")) def consume_events(self) -> Iterator[DagsterEvent]: return super().consume_events() - @deprecated( - **_get_deprecation_kwargs("add_output_metadata"), - ) + @deprecated(**_get_deprecation_kwargs("add_output_metadata")) @public def add_output_metadata( self, @@ -1567,44 +1547,32 @@ def add_output_metadata( metadata=metadata, output_name=output_name, mapping_key=mapping_key ) - @deprecated( - **_get_deprecation_kwargs("asset_key_for_input"), - ) + @deprecated(**_get_deprecation_kwargs("asset_key_for_input")) @public def asset_key_for_input(self, input_name: str) -> AssetKey: return super().asset_key_for_input(input_name=input_name) - @deprecated( - **_get_deprecation_kwargs("asset_key_for_output"), - ) + @deprecated(**_get_deprecation_kwargs("asset_key_for_output")) @public def asset_key_for_output(self, output_name: str = "result") -> AssetKey: return super().asset_key_for_output(output_name=output_name) - @deprecated( - **_get_deprecation_kwargs("asset_partition_key_for_input"), - ) + @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_input")) @public def asset_partition_key_for_input(self, input_name: str) -> str: return super().asset_partition_key_for_input(input_name=input_name) - @deprecated( - **_get_deprecation_kwargs("asset_partition_key_for_output"), - ) + @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_output")) @public def asset_partition_key_for_output(self, output_name: str = "result") -> str: return super().asset_partition_key_for_output(output_name=output_name) - @deprecated( - **_get_deprecation_kwargs("asset_partition_key_range_for_input"), - ) + @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_input")) @public def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: return super().asset_partition_key_range_for_input(input_name=input_name) - @deprecated( - **_get_deprecation_kwargs("asset_partition_key_range_for_output"), - ) + @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_output")) @public def asset_partition_key_range_for_output( self, output_name: str = "result" From eb0482860f81b04c8524e713e677dfd406d7f672 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 18 Sep 2023 14:04:23 -0400 Subject: [PATCH 12/31] update context with methods found during testing --- .../_core/execution/context/compute.py | 135 +++++++++++++----- 1 file changed, 100 insertions(+), 35 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 6ee9d3c98ffe7..19d1a79b0a3cf 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1264,15 +1264,15 @@ def asset_check_spec(self) -> AssetCheckSpec: "file_manager", "has_assets_def", "get_mapping_key", - # "get_step_execution_context", # used by internals + "get_step_execution_context", # TODO - used by internals "job_def", "node_handle", "op", "op_config", - # "op_def", # used by internals + "op_def", # TODO - used by internals "op_handle", "step_launcher", - # "has_events", # used by internals + "has_events", # TODO - used by internals "consume_events", ] ) @@ -1286,10 +1286,12 @@ def asset_check_spec(self) -> AssetCheckSpec: "add_output_metadata": OUTPUT_METADATA_ALT, "asset_key_for_input": INPUT_OUTPUT_ALT, "asset_key_for_output": INPUT_OUTPUT_ALT, + "has_partition_key": "use is_partition_step instead.", "asset_partition_key_for_input": PARTITION_KEY_RANGE_AS_ALT, "asset_partition_key_for_output": PARTITION_KEY_RANGE_AS_ALT, "asset_partition_key_range_for_input": PARTITION_KEY_RANGE_AS_ALT, "asset_partition_key_range_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range": PARTITION_KEY_RANGE_AS_ALT, "asset_partition_keys_for_input": PARTITION_KEY_RANGE_AS_ALT, "asset_partition_keys_for_output": PARTITION_KEY_RANGE_AS_ALT, "asset_partitions_time_window_for_input": PARTITION_KEY_RANGE_AS_ALT, @@ -1309,6 +1311,7 @@ def asset_check_spec(self) -> AssetCheckSpec: "get_tag": "use dagster_run.get_tag instead", "run_tags": "use dagster_run.tags instead", "set_data_version": "use MaterializeResult instead", + "run": "use dagster_run instead.", } # TODO - add AssetCheck related methods to this list @@ -1422,6 +1425,12 @@ def retry_number(self) -> int: # Additional methods + @public + @property + def instance(self) -> DagsterInstance: + """DagsterInstance: The current Dagster instance.""" + return self._step_execution_context.instance + @public @property def dagster_run(self) -> DagsterRun: @@ -1476,64 +1485,85 @@ def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKey """TODO - implement in stacked pr.""" pass + # TODO the below methods weren't originally part of the deprecated list, but are also not part + # of the context interface. What should we do with them? + + @public + @property + def resources(self) -> Any: + """Resources: The currently available resources.""" + return self._step_execution_context.resources + + @public + @property + def run_config(self) -> Mapping[str, object]: + """dict: The run config for the current execution.""" + return self._step_execution_context.run_config + # deprecated methods. All remaining methods on OpExecutionContext should be here with the # appropriate deprecation warning + @deprecated(**_get_deprecation_kwargs("op_def")) + @public + @property + def op_def(self) -> OpDefinition: + return self.op_execution_context.op_def + @deprecated(**_get_deprecation_kwargs("op_config")) @public @property def op_config(self) -> Any: - return super().op_config + return self.op_execution_context.op_config @deprecated(**_get_deprecation_kwargs("file_manager")) @property def file_manager(self): - return super().file_manager + return self.op_execution_context.file_manager @deprecated(**_get_deprecation_kwargs("has_assets_def")) @public @property def has_assets_def(self) -> bool: - return super().has_assets_def + return self.op_execution_context.has_assets_def @deprecated(**_get_deprecation_kwargs("get_mapping_key")) @public def get_mapping_key(self) -> Optional[str]: - return super().get_mapping_key() + return self.op_execution_context.get_mapping_key() @deprecated(**_get_deprecation_kwargs("job_def")) @public @property def job_def(self) -> JobDefinition: - return super().job_def + return self.op_execution_context.job_def @deprecated(**_get_deprecation_kwargs("node_handle")) @property def node_handle(self) -> NodeHandle: - return super().node_handle + return self.op_execution_context.node_handle @deprecated(**_get_deprecation_kwargs("op")) @property def op(self) -> Node: - return super().op + return self.op_execution_context.op @deprecated(**_get_deprecation_kwargs("describe_op")) def describe_op(self): - return super().describe_op() + return self.op_execution_context.describe_op() @deprecated(**_get_deprecation_kwargs("op_handle")) @property def op_handle(self) -> NodeHandle: - return super().op_handle + return self.op_execution_context.op_handle @deprecated(**_get_deprecation_kwargs("step_launcher")) @property def step_launcher(self) -> Optional[StepLauncher]: - return super().step_launcher + return self.op_execution_context.step_launcher @deprecated(**_get_deprecation_kwargs("consume_events")) def consume_events(self) -> Iterator[DagsterEvent]: - return super().consume_events() + return self.op_execution_context.consume_events() @deprecated(**_get_deprecation_kwargs("add_output_metadata")) @public @@ -1543,104 +1573,139 @@ def add_output_metadata( output_name: Optional[str] = None, mapping_key: Optional[str] = None, ) -> None: - return super().add_output_metadata( + return self.op_execution_context.add_output_metadata( metadata=metadata, output_name=output_name, mapping_key=mapping_key ) @deprecated(**_get_deprecation_kwargs("asset_key_for_input")) @public def asset_key_for_input(self, input_name: str) -> AssetKey: - return super().asset_key_for_input(input_name=input_name) + return self.op_execution_context.asset_key_for_input(input_name=input_name) @deprecated(**_get_deprecation_kwargs("asset_key_for_output")) @public def asset_key_for_output(self, output_name: str = "result") -> AssetKey: - return super().asset_key_for_output(output_name=output_name) + return self.op_execution_context.asset_key_for_output(output_name=output_name) @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_input")) @public def asset_partition_key_for_input(self, input_name: str) -> str: - return super().asset_partition_key_for_input(input_name=input_name) + return self.op_execution_context.asset_partition_key_for_input(input_name=input_name) @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_output")) @public def asset_partition_key_for_output(self, output_name: str = "result") -> str: - return super().asset_partition_key_for_output(output_name=output_name) + return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_input")) @public def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: - return super().asset_partition_key_range_for_input(input_name=input_name) + return self.op_execution_context.asset_partition_key_range_for_input(input_name=input_name) @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_output")) @public def asset_partition_key_range_for_output( self, output_name: str = "result" ) -> PartitionKeyRange: - return super().asset_partition_key_range_for_output(output_name=output_name) + return self.op_execution_context.asset_partition_key_range_for_output( + output_name=output_name + ) + + @deprecated(**_get_deprecation_kwargs("has_partition_key")) + @public + @property + def has_partition_key(self) -> bool: + return self.op_execution_context.has_partition_key + + @deprecated(**_get_deprecation_kwargs("asset_partition_key_range")) + @public + @property + def asset_partition_key_range(self) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_input")) @public def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: - return super().asset_partition_keys_for_input(input_name=input_name) + return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name) @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_output")) @public def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: - return super().asset_partition_keys_for_output(output_name=output_name) + return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_input")) @public def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: - return super().asset_partitions_time_window_for_input(input_name=input_name) + return self.op_execution_context.asset_partitions_time_window_for_input( + input_name=input_name + ) @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_output")) @public def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: - return super().asset_partitions_time_window_for_output(output_name=output_name) + return self.op_execution_context.asset_partitions_time_window_for_output( + output_name=output_name + ) @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_input")) @public def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: - return super().asset_partitions_def_for_input(input_name=input_name) + return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name) @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_output")) @public def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: - return super().asset_partitions_def_for_output(output_name=output_name) + return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) @deprecated(**_get_deprecation_kwargs("get_output_metadata")) def get_output_metadata( self, output_name: str, mapping_key: Optional[str] = None ) -> Optional[Mapping[str, Any]]: - return super().get_output_metadata(output_name=output_name, mapping_key=mapping_key) + return self.op_execution_context.get_output_metadata( + output_name=output_name, mapping_key=mapping_key + ) @deprecated(**_get_deprecation_kwargs("output_for_asset_key")) @public def output_for_asset_key(self, asset_key: AssetKey) -> str: - return super().output_for_asset_key(asset_key=asset_key) + return self.op_execution_context.output_for_asset_key(asset_key=asset_key) @deprecated(**_get_deprecation_kwargs("selected_output_names")) @public @property def selected_output_names(self) -> AbstractSet[str]: - return super().selected_output_names + return self.op_execution_context.selected_output_names @deprecated(**_get_deprecation_kwargs("has_tag")) @public def has_tag(self, key: str) -> bool: - return super().has_tag(key=key) + return self.op_execution_context.has_tag(key=key) @deprecated(**_get_deprecation_kwargs("get_tag")) @public def get_tag(self, key: str) -> Optional[str]: - return super().get_tag(key=key) + return self.op_execution_context.get_tag(key=key) - @deprecated(**_get_deprecation_kwargs("run_tags")) @property + @deprecated(**_get_deprecation_kwargs("run_tags")) def run_tags(self) -> Mapping[str, str]: - return super().run_tags + return self.op_execution_context.run_tags @deprecated(**_get_deprecation_kwargs("set_data_version")) def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None: - return super().set_data_version(asset_key=asset_key, data_version=data_version) + return self.op_execution_context.set_data_version( + asset_key=asset_key, data_version=data_version + ) + + @deprecated(**_get_deprecation_kwargs("run")) + @property + def run(self) -> DagsterRun: + return self.op_execution_context.run + + @deprecated(**_get_deprecation_kwargs("get_step_execution_context")) + def get_step_execution_context(self) -> StepExecutionContext: + return self.op_execution_context.get_step_execution_context() + + @deprecated(**_get_deprecation_kwargs("has_events")) + def has_events(self) -> bool: + return self.op_execution_context.has_events() From 5e7855e561eb205ab7fb43924f2ab4dd51b57201 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 18 Sep 2023 16:14:17 -0400 Subject: [PATCH 13/31] change how instance check works --- .../dagster/dagster/_core/execution/context/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 19d1a79b0a3cf..712059af33616 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -106,7 +106,7 @@ def __instancecheck__(cls, instance) -> bool: # This makes isinstance(context, OpExecutionContext) throw a deprecation warning when # context is an AssetExecutionContext. This metaclass can be deleted once AssetExecutionContext # has been split into it's own class in 1.7.0 - if isinstance(instance, AssetExecutionContext): + if type(instance) is AssetExecutionContext: deprecation_warning( subject="AssetExecutionContext", additional_warn_text=( From 57dae89fa7f52bb86ed22e01c3cd45a56f5e25c1 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 19 Sep 2023 10:26:41 -0400 Subject: [PATCH 14/31] remove unneeded comments --- .../dagster/dagster/_core/execution/context/compute.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 712059af33616..a6e58fdb1f9d1 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1299,15 +1299,12 @@ def asset_check_spec(self) -> AssetCheckSpec: "asset_partitions_def_for_input": PARTITION_KEY_RANGE_AS_ALT, "asset_partitions_def_for_output": PARTITION_KEY_RANGE_AS_ALT, "get_output_metadata": "use op_execution_context.op_def.get_output(...).metadata", - # "merge_output_metadata": OUTPUT_METADATA_ALT, # TODO - this method doesn't exist, check if it has a different name "output_for_asset_key": INPUT_OUTPUT_ALT, "selected_output_names": INPUT_OUTPUT_ALT, } ALTERNATE_AVAILABLE_METHODS = { - "has_tag": ( - "use dagster_run.has_tag instead" - ), # TODO - was dagster_run intended to be a method/attr on AssetExecutionContext? + "has_tag": "use dagster_run.has_tag instead", "get_tag": "use dagster_run.get_tag instead", "run_tags": "use dagster_run.tags instead", "set_data_version": "use MaterializeResult instead", From c9c3fd9bf2f6697cd6e4a20815e70b7f2ea3ffa9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 19 Sep 2023 15:21:47 -0400 Subject: [PATCH 15/31] update step execution context methods --- .../dagster/_core/execution/context/compute.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index a6e58fdb1f9d1..f7389c56d4c7e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1345,9 +1345,6 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: self._op_execution_context = check.inst_param( op_execution_context, "op_execution_context", OpExecutionContext ) - self._step_execution_context = ( - self._op_execution_context._step_execution_context # noqa: SLF001 - ) @public @property @@ -1426,13 +1423,13 @@ def retry_number(self) -> int: @property def instance(self) -> DagsterInstance: """DagsterInstance: The current Dagster instance.""" - return self._step_execution_context.instance + return self._op_execution_context.instance @public @property def dagster_run(self) -> DagsterRun: """PipelineRun: The current pipeline run.""" - return self._step_execution_context.dagster_run + return self._op_execution_context.dagster_run @public @property @@ -1489,13 +1486,13 @@ def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKey @property def resources(self) -> Any: """Resources: The currently available resources.""" - return self._step_execution_context.resources + return self._op_execution_context.resources @public @property def run_config(self) -> Mapping[str, object]: """dict: The run config for the current execution.""" - return self._step_execution_context.run_config + return self._op_execution_context.run_config # deprecated methods. All remaining methods on OpExecutionContext should be here with the # appropriate deprecation warning From 0e37585765d751e298e8fec2cf41eaee2817020a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 13:03:06 -0400 Subject: [PATCH 16/31] small pr comments --- .../dagster/_core/execution/context/compute.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index f7389c56d4c7e..d39127a8a49e3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1264,15 +1264,15 @@ def asset_check_spec(self) -> AssetCheckSpec: "file_manager", "has_assets_def", "get_mapping_key", - "get_step_execution_context", # TODO - used by internals + "get_step_execution_context", "job_def", "node_handle", "op", "op_config", - "op_def", # TODO - used by internals + "op_def", "op_handle", "step_launcher", - "has_events", # TODO - used by internals + "has_events", "consume_events", ] ) @@ -1477,10 +1477,9 @@ def asset_check_spec(self) -> AssetCheckSpec: @public def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange: """TODO - implement in stacked pr.""" - pass - - # TODO the below methods weren't originally part of the deprecated list, but are also not part - # of the context interface. What should we do with them? + raise NotImplementedError( + "partition_key_range_for_asset_key not implemented in this branch" + ) @public @property From ecdefc174029bb52fb3c1631809b98810bffa9fe Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 13:08:11 -0400 Subject: [PATCH 17/31] small changes from testing --- .../dagster/dagster/_core/execution/context/compute.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d39127a8a49e3..a0c995d59b372 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1278,7 +1278,7 @@ def asset_check_spec(self) -> AssetCheckSpec: ) -PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset instead" +PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset_key instead" INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" OUTPUT_METADATA_ALT = "return MaterializeResult from the asset instead" @@ -1305,7 +1305,7 @@ def asset_check_spec(self) -> AssetCheckSpec: ALTERNATE_AVAILABLE_METHODS = { "has_tag": "use dagster_run.has_tag instead", - "get_tag": "use dagster_run.get_tag instead", + "get_tag": "use dagster_run.tags.get instead", "run_tags": "use dagster_run.tags instead", "set_data_version": "use MaterializeResult instead", "run": "use dagster_run instead.", @@ -1398,7 +1398,7 @@ def partition_key(self) -> str: @public @property def partition_key_range(self) -> PartitionKeyRange: - return self._op_execution_context.asset_partition_key_range + return self._op_execution_context.partition_key_range @property def partition_time_window(self) -> TimeWindow: From 11273c83e8007fc7a640ed645639bc52ccb96f47 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 13:30:55 -0400 Subject: [PATCH 18/31] un-deprecate partition methods for the moment --- .../_core/execution/context/compute.py | 137 ++++++++---------- 1 file changed, 60 insertions(+), 77 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index a0c995d59b372..c875ac547dabf 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1474,13 +1474,6 @@ def get_assets_code_version( def asset_check_spec(self) -> AssetCheckSpec: return self._op_execution_context.asset_check_spec - @public - def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange: - """TODO - implement in stacked pr.""" - raise NotImplementedError( - "partition_key_range_for_asset_key not implemented in this branch" - ) - @public @property def resources(self) -> Any: @@ -1493,6 +1486,66 @@ def run_config(self) -> Mapping[str, object]: """dict: The run config for the current execution.""" return self._op_execution_context.run_config + # partition methods that will be marked deprecated once we have aligned on future partition methods + + @public + def asset_partition_key_for_input(self, input_name: str) -> str: + return self.op_execution_context.asset_partition_key_for_input(input_name=input_name) + + @public + def asset_partition_key_for_output(self, output_name: str = "result") -> str: + return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) + + @public + def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range_for_input(input_name=input_name) + + @public + def asset_partition_key_range_for_output( + self, output_name: str = "result" + ) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range_for_output( + output_name=output_name + ) + + @public + @property + def has_partition_key(self) -> bool: + return self.op_execution_context.has_partition_key + + @public + @property + def asset_partition_key_range(self) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range + + @public + def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: + return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name) + + @public + def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: + return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) + + @public + def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: + return self.op_execution_context.asset_partitions_time_window_for_input( + input_name=input_name + ) + + @public + def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: + return self.op_execution_context.asset_partitions_time_window_for_output( + output_name=output_name + ) + + @public + def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: + return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name) + + @public + def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: + return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) + # deprecated methods. All remaining methods on OpExecutionContext should be here with the # appropriate deprecation warning @@ -1580,76 +1633,6 @@ def asset_key_for_input(self, input_name: str) -> AssetKey: def asset_key_for_output(self, output_name: str = "result") -> AssetKey: return self.op_execution_context.asset_key_for_output(output_name=output_name) - @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_input")) - @public - def asset_partition_key_for_input(self, input_name: str) -> str: - return self.op_execution_context.asset_partition_key_for_input(input_name=input_name) - - @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_output")) - @public - def asset_partition_key_for_output(self, output_name: str = "result") -> str: - return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) - - @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_input")) - @public - def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: - return self.op_execution_context.asset_partition_key_range_for_input(input_name=input_name) - - @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_output")) - @public - def asset_partition_key_range_for_output( - self, output_name: str = "result" - ) -> PartitionKeyRange: - return self.op_execution_context.asset_partition_key_range_for_output( - output_name=output_name - ) - - @deprecated(**_get_deprecation_kwargs("has_partition_key")) - @public - @property - def has_partition_key(self) -> bool: - return self.op_execution_context.has_partition_key - - @deprecated(**_get_deprecation_kwargs("asset_partition_key_range")) - @public - @property - def asset_partition_key_range(self) -> PartitionKeyRange: - return self.op_execution_context.asset_partition_key_range - - @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_input")) - @public - def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: - return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name) - - @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_output")) - @public - def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: - return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) - - @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_input")) - @public - def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: - return self.op_execution_context.asset_partitions_time_window_for_input( - input_name=input_name - ) - - @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_output")) - @public - def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: - return self.op_execution_context.asset_partitions_time_window_for_output( - output_name=output_name - ) - - @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_input")) - @public - def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: - return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name) - - @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_output")) - @public - def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: - return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) - @deprecated(**_get_deprecation_kwargs("get_output_metadata")) def get_output_metadata( self, output_name: str, mapping_key: Optional[str] = None From d96360cd1eda207a83828ab90fcaa533b0ab7037 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 13:51:48 -0400 Subject: [PATCH 19/31] make asset provenance and code version populate on init --- .../_core/execution/context/compute.py | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index c875ac547dabf..04cdde6923ec0 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1345,6 +1345,17 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: self._op_execution_context = check.inst_param( op_execution_context, "op_execution_context", OpExecutionContext ) + self._asset_keys = list(self.op_execution_context.assets_def.keys_by_output_name.values()) + self._selected_asset_keys = self._op_execution_context.selected_asset_keys + self._assets_def = self._op_execution_context.assets_def + + self._provenance_by_asset_key = { + key: self._op_execution_context.get_asset_provenance(key) + for key in self._selected_asset_keys + } + self._code_version_by_asset_key = { + key: self._assets_def.code_versions_by_key[key] for key in self._selected_asset_keys + } @public @property @@ -1364,27 +1375,23 @@ def asset_key(self) -> AssetKey: @property def asset_keys(self) -> Sequence[AssetKey]: - return list(self.op_execution_context.assets_def.keys_by_output_name.values()) + return self._asset_keys @property def provenance(self) -> Optional[DataProvenance]: - return self.get_asset_provenance(self.asset_key) + return self._provenance_by_asset_key[self.asset_key] @property def provenance_by_asset_key(self) -> Mapping[AssetKey, Optional[DataProvenance]]: - provenance_map = {} - for key in self.asset_keys: - provenance_map[key] = self.get_asset_provenance(key) - - return provenance_map + return self._provenance_by_asset_key @property def code_version(self) -> Optional[str]: - return self.get_assets_code_version([self.asset_key])[self.asset_key] + return self.code_version_by_asset_key[self.asset_key] @property def code_version_by_asset_key(self) -> Mapping[AssetKey, Optional[str]]: - return self.get_assets_code_version(self.asset_keys) + return self._code_version_by_asset_key @public @property @@ -1449,13 +1456,15 @@ def log_event(self, event: UserEvent) -> None: @public @property def assets_def(self) -> AssetsDefinition: - return self._op_execution_context.assets_def + return self._assets_def @public @property def selected_asset_keys(self) -> AbstractSet[AssetKey]: - return self._op_execution_context.selected_asset_keys + return self._selected_asset_keys + # TODO - both get_asset_provenance and get_assets_code_version query the instance - do we want to + # disallow this, or make the function name more indicative that they are querying the db? @public @experimental def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: From 5f5eae7af0ef4fc6251face05615a78f7ce98c96 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 14:19:49 -0400 Subject: [PATCH 20/31] update check specs method --- .../dagster/_core/execution/context/compute.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 04cdde6923ec0..d50df9cf54dc4 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1308,7 +1308,8 @@ def asset_check_spec(self) -> AssetCheckSpec: "get_tag": "use dagster_run.tags.get instead", "run_tags": "use dagster_run.tags instead", "set_data_version": "use MaterializeResult instead", - "run": "use dagster_run instead.", + "run": "use dagster_run instead", + "asset_check_spec": "use check_specs_by_asset_key instead", } # TODO - add AssetCheck related methods to this list @@ -1356,6 +1357,10 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: self._code_version_by_asset_key = { key: self._assets_def.code_versions_by_key[key] for key in self._selected_asset_keys } + self._check_specs_by_asset_key = { + self._op_execution_context.asset_key_for_output(output_name): check + for output_name, check in self._assets_def.check_specs_by_output_name.items() + } @public @property @@ -1479,9 +1484,10 @@ def get_assets_code_version( asset_keys ) + @public @property - def asset_check_spec(self) -> AssetCheckSpec: - return self._op_execution_context.asset_check_spec + def check_specs_by_asset_key(self) -> Mapping[AssetKey, AssetCheckSpec]: + return self._check_specs_by_asset_key @public @property @@ -1694,3 +1700,8 @@ def get_step_execution_context(self) -> StepExecutionContext: @deprecated(**_get_deprecation_kwargs("has_events")) def has_events(self) -> bool: return self.op_execution_context.has_events() + + @deprecated(**_get_deprecation_kwargs("asset_check_spec")) + @property + def asset_check_spec(self) -> AssetCheckSpec: + return self._op_execution_context.asset_check_spec From bf292032be659ec5d36fc71c99dce2a443e3abb1 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 14:21:47 -0400 Subject: [PATCH 21/31] remove public marker on deprecated methods --- .../dagster/_core/execution/context/compute.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d50df9cf54dc4..50a3a4e8b92fd 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1565,13 +1565,11 @@ def asset_partitions_def_for_output(self, output_name: str = "result") -> Partit # appropriate deprecation warning @deprecated(**_get_deprecation_kwargs("op_def")) - @public @property def op_def(self) -> OpDefinition: return self.op_execution_context.op_def @deprecated(**_get_deprecation_kwargs("op_config")) - @public @property def op_config(self) -> Any: return self.op_execution_context.op_config @@ -1582,18 +1580,15 @@ def file_manager(self): return self.op_execution_context.file_manager @deprecated(**_get_deprecation_kwargs("has_assets_def")) - @public @property def has_assets_def(self) -> bool: return self.op_execution_context.has_assets_def @deprecated(**_get_deprecation_kwargs("get_mapping_key")) - @public def get_mapping_key(self) -> Optional[str]: return self.op_execution_context.get_mapping_key() @deprecated(**_get_deprecation_kwargs("job_def")) - @public @property def job_def(self) -> JobDefinition: return self.op_execution_context.job_def @@ -1627,7 +1622,6 @@ def consume_events(self) -> Iterator[DagsterEvent]: return self.op_execution_context.consume_events() @deprecated(**_get_deprecation_kwargs("add_output_metadata")) - @public def add_output_metadata( self, metadata: Mapping[str, Any], @@ -1639,12 +1633,10 @@ def add_output_metadata( ) @deprecated(**_get_deprecation_kwargs("asset_key_for_input")) - @public def asset_key_for_input(self, input_name: str) -> AssetKey: return self.op_execution_context.asset_key_for_input(input_name=input_name) @deprecated(**_get_deprecation_kwargs("asset_key_for_output")) - @public def asset_key_for_output(self, output_name: str = "result") -> AssetKey: return self.op_execution_context.asset_key_for_output(output_name=output_name) @@ -1657,23 +1649,19 @@ def get_output_metadata( ) @deprecated(**_get_deprecation_kwargs("output_for_asset_key")) - @public def output_for_asset_key(self, asset_key: AssetKey) -> str: return self.op_execution_context.output_for_asset_key(asset_key=asset_key) @deprecated(**_get_deprecation_kwargs("selected_output_names")) - @public @property def selected_output_names(self) -> AbstractSet[str]: return self.op_execution_context.selected_output_names @deprecated(**_get_deprecation_kwargs("has_tag")) - @public def has_tag(self, key: str) -> bool: return self.op_execution_context.has_tag(key=key) @deprecated(**_get_deprecation_kwargs("get_tag")) - @public def get_tag(self, key: str) -> Optional[str]: return self.op_execution_context.get_tag(key=key) From 519576d8fb671e550fb463463ad7582b4ec88b4f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 15:33:39 -0400 Subject: [PATCH 22/31] tiny cleanup --- .../dagster/dagster/_core/execution/context/compute.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 50a3a4e8b92fd..9e6e9ff55c30a 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1312,8 +1312,6 @@ def asset_check_spec(self) -> AssetCheckSpec: "asset_check_spec": "use check_specs_by_asset_key instead", } -# TODO - add AssetCheck related methods to this list - def _get_deprecation_kwargs(attr: str): deprecation_kwargs = {"breaking_version": "1.7.0"} @@ -1476,7 +1474,7 @@ def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: return self._op_execution_context.get_asset_provenance(asset_key) @public - # TODO - method naming. this needs work + # TODO - method naming def get_assets_code_version( self, asset_keys: Sequence[AssetKey] ) -> Mapping[AssetKey, Optional[str]]: From 38ed7f51d2f0f764456f2b00b8501edb4e4a9a0c Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 17:23:52 -0400 Subject: [PATCH 23/31] disable experimental warning --- .../dagster/_core/execution/context/compute.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 9e6e9ff55c30a..70719ea6ee6b4 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -44,7 +44,10 @@ from dagster._core.log_manager import DagsterLogManager from dagster._core.storage.dagster_run import DagsterRun from dagster._utils.forked_pdb import ForkedPdb -from dagster._utils.warnings import deprecation_warning +from dagster._utils.warnings import ( + deprecation_warning, + disable_dagster_warnings, +) from .system import StepExecutionContext @@ -1348,16 +1351,16 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: self._selected_asset_keys = self._op_execution_context.selected_asset_keys self._assets_def = self._op_execution_context.assets_def - self._provenance_by_asset_key = { - key: self._op_execution_context.get_asset_provenance(key) - for key in self._selected_asset_keys - } + with disable_dagster_warnings(): + self._provenance_by_asset_key = { + key: self._op_execution_context.get_asset_provenance(key) + for key in self._selected_asset_keys + } self._code_version_by_asset_key = { key: self._assets_def.code_versions_by_key[key] for key in self._selected_asset_keys } self._check_specs_by_asset_key = { - self._op_execution_context.asset_key_for_output(output_name): check - for output_name, check in self._assets_def.check_specs_by_output_name.items() + check.asset_key: check for check in self._assets_def.check_specs_by_output_name.values() } @public From f55bc05bc3b656fb18ee71f01a7eaf2263a899d8 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 15:37:51 -0400 Subject: [PATCH 24/31] add context interface to asset context --- .../dagster/dagster/_core/execution/context/compute.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 70719ea6ee6b4..9e73c606db9f7 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -12,6 +12,8 @@ cast, ) +from dagster_ext import IContext + import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec @@ -1342,7 +1344,7 @@ def _get_deprecation_kwargs(attr: str): return deprecation_kwargs -class AssetExecutionContext(OpExecutionContext): +class AssetExecutionContext(OpExecutionContext, IContext): def __init__(self, op_execution_context: OpExecutionContext) -> None: self._op_execution_context = check.inst_param( op_execution_context, "op_execution_context", OpExecutionContext From 9ca0fbf6745bf53c914fbe94c98cb95adc949bb3 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 16:04:24 -0400 Subject: [PATCH 25/31] remove interface for now --- .../dagster/_core/execution/context/compute.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 9e73c606db9f7..faeb87278b95c 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -12,8 +12,7 @@ cast, ) -from dagster_ext import IContext - +# from dagster_ext import IContext import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec @@ -1279,6 +1278,7 @@ def asset_check_spec(self) -> AssetCheckSpec: "step_launcher", "has_events", "consume_events", + "log_event", ] ) @@ -1344,7 +1344,7 @@ def _get_deprecation_kwargs(attr: str): return deprecation_kwargs -class AssetExecutionContext(OpExecutionContext, IContext): +class AssetExecutionContext(OpExecutionContext): def __init__(self, op_execution_context: OpExecutionContext) -> None: self._op_execution_context = check.inst_param( op_execution_context, "op_execution_context", OpExecutionContext @@ -1457,10 +1457,6 @@ def log(self) -> DagsterLogManager: """DagsterLogManager: The log manager available in the execution context.""" return self._op_execution_context.log - @public - def log_event(self, event: UserEvent) -> None: - return self._op_execution_context.log_event(event) - @public @property def assets_def(self) -> AssetsDefinition: @@ -1696,3 +1692,7 @@ def has_events(self) -> bool: @property def asset_check_spec(self) -> AssetCheckSpec: return self._op_execution_context.asset_check_spec + + @deprecated(**_get_deprecation_kwargs("log_event")) + def log_event(self, event: UserEvent) -> None: + return self._op_execution_context.log_event(event) From 89f7433e52209c3427273549ffbf0f0e9cf27370 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 09:03:36 -0400 Subject: [PATCH 26/31] update with interface changes --- .../_core/execution/context/compute.py | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index faeb87278b95c..c7333670fcca0 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1371,11 +1371,6 @@ def op_execution_context(self) -> OpExecutionContext: return self._op_execution_context # IContext interface methods - - @property - def is_asset_step(self) -> bool: - return self.op_execution_context.has_assets_def - @public @property def asset_key(self) -> AssetKey: @@ -1403,22 +1398,9 @@ def code_version_by_asset_key(self) -> Mapping[AssetKey, Optional[str]]: @public @property - def is_partition_step(self) -> bool: + def is_partitioned(self) -> bool: return self._op_execution_context.has_partition_key - @property - def partition_key(self) -> str: - return self.op_execution_context.partition_key - - @public - @property - def partition_key_range(self) -> PartitionKeyRange: - return self._op_execution_context.partition_key_range - - @property - def partition_time_window(self) -> TimeWindow: - return self.op_execution_context.partition_time_window - @public @property def run_id(self) -> str: @@ -1500,7 +1482,19 @@ def run_config(self) -> Mapping[str, object]: """dict: The run config for the current execution.""" return self._op_execution_context.run_config - # partition methods that will be marked deprecated once we have aligned on future partition methods + # partition methods that may be marked deprecated once we have aligned on future partition methods + @property + def partition_key(self) -> str: + return self.op_execution_context.partition_key + + @public + @property + def partition_key_range(self) -> PartitionKeyRange: + return self._op_execution_context.partition_key_range + + @property + def partition_time_window(self) -> TimeWindow: + return self.op_execution_context.partition_time_window @public def asset_partition_key_for_input(self, input_name: str) -> str: From d01d5d2713db77daf01a5158fc72cb7eea9c94f0 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 10:26:31 -0400 Subject: [PATCH 27/31] add interface back --- .../dagster/dagster/_core/execution/context/compute.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index c7333670fcca0..64031c0a24ac6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -12,7 +12,8 @@ cast, ) -# from dagster_ext import IContext +from dagster_ext import IContext + import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec @@ -1344,7 +1345,7 @@ def _get_deprecation_kwargs(attr: str): return deprecation_kwargs -class AssetExecutionContext(OpExecutionContext): +class AssetExecutionContext(OpExecutionContext, IContext): def __init__(self, op_execution_context: OpExecutionContext) -> None: self._op_execution_context = check.inst_param( op_execution_context, "op_execution_context", OpExecutionContext From b129fbfd9e06576b1650b963cee2ade375c3a4aa Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 11:13:54 -0400 Subject: [PATCH 28/31] update selected asset keys to asset keys --- .../_core/execution/context/compute.py | 34 +++++-------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 64031c0a24ac6..cc14e61fec742 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1316,6 +1316,7 @@ def asset_check_spec(self) -> AssetCheckSpec: "set_data_version": "use MaterializeResult instead", "run": "use dagster_run instead", "asset_check_spec": "use check_specs_by_asset_key instead", + "selected_asset_keys": "use asset_keys instead", } @@ -1350,17 +1351,16 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: self._op_execution_context = check.inst_param( op_execution_context, "op_execution_context", OpExecutionContext ) - self._asset_keys = list(self.op_execution_context.assets_def.keys_by_output_name.values()) - self._selected_asset_keys = self._op_execution_context.selected_asset_keys + self._asset_keys = self._op_execution_context.selected_asset_keys self._assets_def = self._op_execution_context.assets_def with disable_dagster_warnings(): self._provenance_by_asset_key = { key: self._op_execution_context.get_asset_provenance(key) - for key in self._selected_asset_keys + for key in self._asset_keys } self._code_version_by_asset_key = { - key: self._assets_def.code_versions_by_key[key] for key in self._selected_asset_keys + key: self._assets_def.code_versions_by_key[key] for key in self._asset_keys } self._check_specs_by_asset_key = { check.asset_key: check for check in self._assets_def.check_specs_by_output_name.values() @@ -1445,27 +1445,6 @@ def log(self) -> DagsterLogManager: def assets_def(self) -> AssetsDefinition: return self._assets_def - @public - @property - def selected_asset_keys(self) -> AbstractSet[AssetKey]: - return self._selected_asset_keys - - # TODO - both get_asset_provenance and get_assets_code_version query the instance - do we want to - # disallow this, or make the function name more indicative that they are querying the db? - @public - @experimental - def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: - return self._op_execution_context.get_asset_provenance(asset_key) - - @public - # TODO - method naming - def get_assets_code_version( - self, asset_keys: Sequence[AssetKey] - ) -> Mapping[AssetKey, Optional[str]]: - return self.op_execution_context.instance.get_latest_materialization_code_versions( - asset_keys - ) - @public @property def check_specs_by_asset_key(self) -> Mapping[AssetKey, AssetCheckSpec]: @@ -1558,6 +1537,11 @@ def asset_partitions_def_for_output(self, output_name: str = "result") -> Partit # deprecated methods. All remaining methods on OpExecutionContext should be here with the # appropriate deprecation warning + @deprecated(**_get_deprecation_kwargs("selected_asset_keys")) + @property + def selected_asset_keys(self) -> AbstractSet[AssetKey]: + return self.op_execution_context.selected_asset_keys + @deprecated(**_get_deprecation_kwargs("op_def")) @property def op_def(self) -> OpDefinition: From 2e71fd1810b9963b2e19f0b91eece5362bc6fe14 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 11:41:05 -0400 Subject: [PATCH 29/31] random bk --- .../dagster-buildkite/dagster_buildkite/steps/dagster.py | 9 ++++++--- .../dagster/dagster/_core/execution/context/compute.py | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py index fafe7ab8e6a4c..24e9135a4f78a 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py @@ -140,7 +140,10 @@ def build_sql_schema_check_steps() -> List[CommandStep]: return [ CommandStepBuilder(":mysql: mysql-schema") .on_test_image(AvailablePythonVersion.get_default()) - .run("pip install -e python_modules/dagster", "python scripts/check_schemas.py") + .run( + "pip install -e python_modules/dagster -e python_modules/dagster-ext", + "python scripts/check_schemas.py", + ) .with_skip(skip_mysql_if_no_changes_to_dependencies(["dagster"])) .build() ] @@ -151,8 +154,8 @@ def build_graphql_python_client_backcompat_steps() -> List[CommandStep]: CommandStepBuilder(":graphql: GraphQL Python Client backcompat") .on_test_image(AvailablePythonVersion.get_default()) .run( - "pip install -e python_modules/dagster[test] -e python_modules/dagster-graphql -e" - " python_modules/automation", + "pip install -e python_modules/dagster[test] -e python_modules/dagster-graphql -e " + " python_modules/automation -e python_modules/dagster-ext", "dagster-graphql-client query check", ) .with_skip( diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index cc14e61fec742..8892d565a5618 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1280,6 +1280,7 @@ def asset_check_spec(self) -> AssetCheckSpec: "has_events", "consume_events", "log_event", + "get_asset_provenance", ] ) @@ -1675,3 +1676,8 @@ def asset_check_spec(self) -> AssetCheckSpec: @deprecated(**_get_deprecation_kwargs("log_event")) def log_event(self, event: UserEvent) -> None: return self._op_execution_context.log_event(event) + + @deprecated(**_get_deprecation_kwargs("get_asset_provenance")) + @experimental + def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: + return self._op_execution_context.get_asset_provenance(asset_key) From fded8b2c2882500078c69d44cd1df1d0c6ce51c3 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 19 Sep 2023 12:26:21 -0400 Subject: [PATCH 30/31] add partition methods --- .../_core/execution/context/compute.py | 76 ++++++++++++++- .../dagster/_core/execution/context/system.py | 93 +++++++++++++++---- 2 files changed, 149 insertions(+), 20 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 8892d565a5618..28540d9c1b741 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -28,6 +28,7 @@ AssetKey, AssetMaterialization, AssetObservation, + CoercibleToAssetKey, ExpectationResult, UserEvent, ) @@ -870,7 +871,10 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset): """ - return self._step_execution_context.asset_partition_key_range_for_input(input_name) + asset_key = self.asset_key_for_input(input_name) + return self._step_execution_context.partition_key_range_for_asset( + asset_key, is_dependency=True + ) @public def asset_partition_key_for_input(self, input_name: str) -> str: @@ -1451,6 +1455,61 @@ def assets_def(self) -> AssetsDefinition: def check_specs_by_asset_key(self) -> Mapping[AssetKey, AssetCheckSpec]: return self._check_specs_by_asset_key + @public + def partition_key_range_for_asset_key( + self, asset_key: CoercibleToAssetKey, is_dependency: bool = False + ) -> PartitionKeyRange: + """Return the PartitionKeyRange for the provided asset. Errors if there is more or less than one. + + If you want to write your asset to support running a backfill of several partitions in a single run, + you can use partition_key_range_for_asset to get the range of partitions being materialized + by the backfill. + + Args: + asset_key (Union[AssetKey, str]): The asset to get the partition key range for + is_dependency (bool): If the asset is a self-dependent asset, you can set is_dependency=True to + fetch the partition key range of the asset as an upstream dependency + + Examples: + .. code-block:: python + + partitions_def = DailyPartitionsDefinition("2023-08-20") + @asset( + partitions_def=partitions_def + ) + def upstream_asset(context: AssetExecutionContext): + context.log.info(context.partition_key_range_for_asset("upstream_asset")) + # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log: + # PartitionKeyRange(start="2023-08-21", end="2023-08-25") + @asset( + partitions_def=partitions_def, + ) + def downstream_asset(context: AssetExecutionContext, upstream_asset): + context.log.info(context.partition_key_range_for_asset("upstream_asset")) + context.log.info(context.partition_key_range_for_asset("downstream_asset")) + # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log: + # PartitionKeyRange(start="2023-08-21", end="2023-08-25") + # PartitionKeyRange(start="2023-08-21", end="2023-08-25") + @asset( + partitions_def=partitions_def, + ins={ + "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)) + } + ) + def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset): + context.log.info(context.partition_key_range_for_asset("self_dependent_asset", is_dependency=True)) + context.log.info(context.partition_key_range_for_asset("self_dependent_asset")) + # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log: + # PartitionKeyRange(start="2023-08-20", end="2023-08-24") + # PartitionKeyRange(start="2023-08-21", end="2023-08-25") + """ + return self._step_execution_context.partition_key_range_for_asset( + asset_key, is_dependency=is_dependency + ) + + # TODO the below methods weren't originally part of the deprecated list, but are also not part + # of the context interface. What should we do with them? + @public @property def resources(self) -> Any: @@ -1477,18 +1536,25 @@ def partition_key_range(self) -> PartitionKeyRange: def partition_time_window(self) -> TimeWindow: return self.op_execution_context.partition_time_window + # deprecated methods. All remaining methods on OpExecutionContext should be here with the + # appropriate deprecation warning + + @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_input")) @public def asset_partition_key_for_input(self, input_name: str) -> str: return self.op_execution_context.asset_partition_key_for_input(input_name=input_name) + @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_output")) @public def asset_partition_key_for_output(self, output_name: str = "result") -> str: return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) + @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_input")) @public def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: return self.op_execution_context.asset_partition_key_range_for_input(input_name=input_name) + @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_output")) @public def asset_partition_key_range_for_output( self, output_name: str = "result" @@ -1497,40 +1563,48 @@ def asset_partition_key_range_for_output( output_name=output_name ) + @deprecated(**_get_deprecation_kwargs("has_partition_key")) @public @property def has_partition_key(self) -> bool: return self.op_execution_context.has_partition_key + @deprecated(**_get_deprecation_kwargs("asset_partition_key_range")) @public @property def asset_partition_key_range(self) -> PartitionKeyRange: return self.op_execution_context.asset_partition_key_range + @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_input")) @public def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name) + @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_output")) @public def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) + @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_input")) @public def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: return self.op_execution_context.asset_partitions_time_window_for_input( input_name=input_name ) + @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_output")) @public def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: return self.op_execution_context.asset_partitions_time_window_for_output( output_name=output_name ) + @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_input")) @public def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name) + @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_output")) @public def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index e36f176ef9b27..e8ab84b9c32b1 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -30,7 +30,7 @@ extract_data_version_from_entry, ) from dagster._core.definitions.dependency import OpNode -from dagster._core.definitions.events import AssetKey, AssetLineageInfo +from dagster._core.definitions.events import AssetKey, AssetLineageInfo, CoercibleToAssetKey from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.job_base import IJob from dagster._core.definitions.job_definition import JobDefinition @@ -662,7 +662,7 @@ def for_input_manager( node_handle=self.node_handle, input_name=name ) asset_partitions_subset = ( - self.asset_partitions_subset_for_input(name) + self.partitions_subset_for_upstream_asset(asset_key) if self.has_asset_partitions_for_input(name) else None ) @@ -932,8 +932,8 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: input_name = self.job_def.asset_layer.input_for_asset_key(self.node_handle, key) # Exclude AllPartitionMapping for now to avoid huge queries if input_name and self.has_asset_partitions_for_input(input_name): - subset = self.asset_partitions_subset_for_input( - input_name, require_valid_partitions=False + subset = self.partitions_subset_for_upstream_asset( + key, require_valid_partitions=False ) input_keys = list(subset.get_partition_keys()) @@ -1043,26 +1043,78 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool: and asset_layer.partitions_def_for_asset(upstream_asset_key) is not None ) - def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: - subset = self.asset_partitions_subset_for_input(input_name) - partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + def _load_partition_info_as_upstream_asset( + self, current_asset: CoercibleToAssetKey, is_dependency: bool + ) -> bool: + # In some cases, if the asset we are getting the partition info for is a parent of the asset that is currently + # materializing, we need to load the info in a different way. So we must first figure out + # if asset is the parent of the currently materializing asset + + asset_layer = self.job_def.asset_layer + currently_materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle) + asset_key = AssetKey.from_coercible(current_asset) + + is_resulting_asset = ( + currently_materializing_assets_def is not None + and asset_key in currently_materializing_assets_def.keys_by_output_name.values() + ) + is_upstream_asset = ( + currently_materializing_assets_def is not None + and asset_key in currently_materializing_assets_def.keys_by_input_name.values() ) - if len(partition_key_ranges) != 1: - check.failed( - "Tried to access asset partition key range, but there are " - f"({len(partition_key_ranges)}) key ranges associated with this input.", + # when `asset` is a self-dependent partitioned asset, then is_resulting_asset and is_upstream_asset + # are both True. In this case, we defer to the user-provided is_dependency parameter. If + # is_dependency is True, then we return the partition info of asset as an upstream asset + get_partition_key_range_as_upstream_asset = ( + is_upstream_asset and not is_resulting_asset + ) or (is_upstream_asset and is_resulting_asset and is_dependency) + + return get_partition_key_range_as_upstream_asset + + def partition_key_range_for_asset( + self, asset: Optional[CoercibleToAssetKey], is_dependency: bool = False + ) -> PartitionKeyRange: + if asset is None: + check.failed(f"Tried to access partition key range with invalid asset key: {asset}") + if self._load_partition_info_as_upstream_asset( + current_asset=asset, is_dependency=is_dependency + ): + subset = self.partitions_subset_for_upstream_asset(asset) + partition_key_ranges = subset.get_partition_key_ranges( + dynamic_partitions_store=self.instance ) - return partition_key_ranges[0] + if len(partition_key_ranges) != 1: + check.failed( + "Tried to access asset partition key range, but there are " + f"({len(partition_key_ranges)}) key ranges associated with this input.", + ) + + return partition_key_ranges[0] + else: + return self.asset_partition_key_range + + # def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: + # subset = self.asset_partitions_subset_for_input(input_name) + # partition_key_ranges = subset.get_partition_key_ranges( + # dynamic_partitions_store=self.instance + # ) - def asset_partitions_subset_for_input( - self, input_name: str, *, require_valid_partitions: bool = True + # if len(partition_key_ranges) != 1: + # check.failed( + # "Tried to access asset partition key range, but there are " + # f"({len(partition_key_ranges)}) key ranges associated with this input.", + # ) + + # return partition_key_ranges[0] + + def partitions_subset_for_upstream_asset( + self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True ) -> PartitionsSubset: asset_layer = self.job_def.asset_layer assets_def = asset_layer.assets_def_for_node(self.node_handle) - upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + upstream_asset_key = AssetKey.from_coercible(asset) if upstream_asset_key is not None: upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) @@ -1107,7 +1159,8 @@ def asset_partitions_subset_for_input( check.failed("The input has no asset partitions") def asset_partition_key_for_input(self, input_name: str) -> str: - start, end = self.asset_partition_key_range_for_input(input_name) + asset_key = self.job_def.asset_layer.asset_key_for_input(self.node_handle, input_name) + start, end = self.partition_key_range_for_asset(asset_key, is_dependency=True) if start == end: return start else: @@ -1204,7 +1257,7 @@ def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow: if not has_one_dimension_time_window_partitioning(upstream_asset_partitions_def): raise ValueError( - "Tried to get asset partitions for an input that correponds to a partitioned " + "Tried to get asset partitions for an input that corresponds to a partitioned " "asset that is not time-partitioned." ) @@ -1212,7 +1265,9 @@ def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow: Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition], upstream_asset_partitions_def, ) - partition_key_range = self.asset_partition_key_range_for_input(input_name) + partition_key_range = self.partition_key_range_for_asset( + upstream_asset_key, is_dependency=True + ) return TimeWindow( upstream_asset_partitions_def.time_window_for_partition_key( From 737c656cfc55e56f97e9804a8ae011147f4b2959 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 19 Sep 2023 13:19:39 -0400 Subject: [PATCH 31/31] small updates and testing --- .../_core/execution/context/compute.py | 5 +-- .../dagster/_core/execution/context/system.py | 22 +++---------- .../test_partitioned_assets.py | 32 ++++++++++--------- 3 files changed, 25 insertions(+), 34 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 28540d9c1b741..2dc0274ac22a6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1133,9 +1133,10 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset): # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log: # ["2023-08-20", "2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24"] """ + asset_key = self.asset_key_for_input(input_name) return list( - self._step_execution_context.asset_partitions_subset_for_input( - input_name + self._step_execution_context.partitions_subset_for_upstream_asset( + asset_key ).get_partition_keys() ) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index e8ab84b9c32b1..152cd4b9ccd3b 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1076,7 +1076,7 @@ def partition_key_range_for_asset( self, asset: Optional[CoercibleToAssetKey], is_dependency: bool = False ) -> PartitionKeyRange: if asset is None: - check.failed(f"Tried to access partition key range with invalid asset key: {asset}") + check.failed(f"Tried to access partition key range for invalid asset key: {asset}") if self._load_partition_info_as_upstream_asset( current_asset=asset, is_dependency=is_dependency ): @@ -1095,23 +1095,11 @@ def partition_key_range_for_asset( else: return self.asset_partition_key_range - # def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: - # subset = self.asset_partitions_subset_for_input(input_name) - # partition_key_ranges = subset.get_partition_key_ranges( - # dynamic_partitions_store=self.instance - # ) - - # if len(partition_key_ranges) != 1: - # check.failed( - # "Tried to access asset partition key range, but there are " - # f"({len(partition_key_ranges)}) key ranges associated with this input.", - # ) - - # return partition_key_ranges[0] - def partitions_subset_for_upstream_asset( - self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True + self, asset: Optional[CoercibleToAssetKey], *, require_valid_partitions: bool = True ) -> PartitionsSubset: + if asset is None: + check.failed(f"Tried to access partition for invalid asset key: {asset}") asset_layer = self.job_def.asset_layer assets_def = asset_layer.assets_def_for_node(self.node_handle) upstream_asset_key = AssetKey.from_coercible(asset) @@ -1156,7 +1144,7 @@ def partitions_subset_for_upstream_asset( return mapped_partitions_result.partitions_subset - check.failed("The input has no asset partitions") + check.failed(f"The asset {asset} has no partitions") def asset_partition_key_for_input(self, input_name: str) -> str: asset_key = self.job_def.asset_layer.asset_key_for_input(self.node_handle, input_name) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index e7f694598b76e..cd9c7a194ef61 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -5,6 +5,7 @@ import pendulum import pytest from dagster import ( + AssetExecutionContext, AssetMaterialization, AssetOut, AssetsDefinition, @@ -140,7 +141,7 @@ def test_single_partitioned_asset_job(): class MyIOManager(IOManager): def handle_output(self, context, obj): - assert context.asset_partition_key == "b" + assert context.partition_key == "b" assert context.asset_partitions_def == partitions_def def load_input(self, context): @@ -204,24 +205,24 @@ def test_access_partition_keys_from_context_direct_invocation(): partitions_def = StaticPartitionsDefinition(["a"]) @asset(partitions_def=partitions_def) - def partitioned_asset(context): - assert context.asset_partition_key_for_output() == "a" + def partitioned_asset(context: AssetExecutionContext): + assert context.partition_key == "a" context = build_asset_context(partition_key="a") # check unbound context - assert context.asset_partition_key_for_output() == "a" + assert context.partition_key == "a" # check bound context partitioned_asset(context) # check failure for non-partitioned asset @asset - def non_partitioned_asset(context): + def non_partitioned_asset(context: AssetExecutionContext): with pytest.raises( CheckError, match="Tried to access partition_key for a non-partitioned asset" ): - context.asset_partition_key_for_output() + context.partition_key # noqa: B018 context = build_asset_context() non_partitioned_asset(context) @@ -249,8 +250,8 @@ def load_input(self, context): assert context.asset_partition_key_range == PartitionKeyRange("a", "c") @asset(partitions_def=upstream_partitions_def) - def upstream_asset(context): - assert context.asset_partition_key_for_output() == "b" + def upstream_asset(context: AssetExecutionContext): + assert context.partition_key == "b" @asset def downstream_asset(upstream_asset): @@ -572,8 +573,8 @@ def test_mismatched_job_partitioned_config_with_asset_partitions(): daily_partitions_def = DailyPartitionsDefinition(start_date="2020-01-01") @asset(config_schema={"day_of_month": int}, partitions_def=daily_partitions_def) - def asset1(context): - assert context.op_config["day_of_month"] == 1 + def asset1(context: AssetExecutionContext): + assert context.op_execution_context.op_config["day_of_month"] == 1 assert context.partition_key == "2020-01-01" @hourly_partitioned_config(start_date="2020-01-01-00:00") @@ -596,8 +597,8 @@ def test_partition_range_single_run(): partitions_def = DailyPartitionsDefinition(start_date="2020-01-01") @asset(partitions_def=partitions_def) - def upstream_asset(context) -> None: - assert context.asset_partition_key_range_for_output() == PartitionKeyRange( + def upstream_asset(context: AssetExecutionContext) -> None: + assert context.partition_key_range == PartitionKeyRange( start="2020-01-01", end="2020-01-03" ) @@ -640,8 +641,8 @@ def test_multipartition_range_single_run(): ) @asset(partitions_def=partitions_def) - def multipartitioned_asset(context) -> None: - key_range = context.asset_partition_key_range_for_output() + def multipartitioned_asset(context: AssetExecutionContext) -> None: + key_range = context.partition_key_range assert isinstance(key_range.start, MultiPartitionKey) assert isinstance(key_range.end, MultiPartitionKey) @@ -649,7 +650,8 @@ def multipartitioned_asset(context) -> None: assert key_range.end == MultiPartitionKey({"date": "2020-01-03", "abc": "a"}) assert all( - isinstance(key, MultiPartitionKey) for key in context.asset_partition_keys_for_output() + isinstance(key, MultiPartitionKey) + for key in partitions_def.get_partitions_keys_in_range(context.partition_key_range) ) the_job = define_asset_job("job").resolve(