diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 5ae32ca24a950..0cec5aa26de1d 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index ba9a07c7c5606..35a1832f7d94d 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 2a6477f9c07e8..c4e4ebbd56cfe 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 16ba24c9b71dd..eb19d3f3a7e90 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index bf226555434ac..78aeec3376250 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1,7 +1,9 @@ from abc import ABC, ABCMeta, abstractmethod from contextlib import contextmanager from contextvars import ContextVar -from inspect import _empty as EmptyAnnotation +from inspect import ( + _empty as EmptyAnnotation, +) from typing import ( AbstractSet, Any, @@ -17,7 +19,11 @@ ) import dagster._check as check -from dagster._annotations import deprecated, experimental, public +from dagster._annotations import ( + deprecated, + experimental, + public, +) from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.assets import AssetsDefinition @@ -50,7 +56,6 @@ from dagster._core.instance import DagsterInstance from dagster._core.log_manager import DagsterLogManager from dagster._core.storage.dagster_run import DagsterRun -from dagster._utils.cached_method import cached_method from dagster._utils.forked_pdb import ForkedPdb from dagster._utils.warnings import ( deprecation_warning, @@ -1343,9 +1348,17 @@ def get() -> "OpExecutionContext": return ctx.get_op_execution_context() +def _copy_docs_from_op_execution_context(obj): + setattr(obj, "__doc__", getattr(OpExecutionContext, obj.__name__).__doc__) + return obj + + class AssetExecutionContext(OpExecutionContext): - def __init__(self, step_execution_context: StepExecutionContext): - super().__init__(step_execution_context=step_execution_context) + def __init__(self, op_execution_context: OpExecutionContext) -> None: + self._op_execution_context = check.inst_param( + op_execution_context, "op_execution_context", OpExecutionContext + ) + self._step_execution_context = self._op_execution_context._step_execution_context # noqa: SLF001 @staticmethod def get() -> "AssetExecutionContext": @@ -1354,9 +1367,378 @@ def get() -> "AssetExecutionContext": raise DagsterInvariantViolationError("No current AssetExecutionContext in scope.") return ctx - @cached_method + @property + def op_execution_context(self) -> OpExecutionContext: + return self._op_execution_context + + #### Run related + + @property + @_copy_docs_from_op_execution_context + def run(self) -> DagsterRun: + return self.op_execution_context.run + + @property + @_copy_docs_from_op_execution_context + def dagster_run(self) -> DagsterRun: + return self.op_execution_context.dagster_run + + @property + @_copy_docs_from_op_execution_context + def run_id(self) -> str: + return self.op_execution_context.run_id + + @property + @_copy_docs_from_op_execution_context + def run_config(self) -> Mapping[str, object]: + return self.op_execution_context.run_config + + @property + @_copy_docs_from_op_execution_context + def run_tags(self) -> Mapping[str, str]: + return self.op_execution_context.run_tags + + @public + @_copy_docs_from_op_execution_context + def has_tag(self, key: str) -> bool: + return self.op_execution_context.has_tag(key) + + @public + @_copy_docs_from_op_execution_context + def get_tag(self, key: str) -> Optional[str]: + return self.op_execution_context.get_tag(key) + + #### op related + + @property + @_copy_docs_from_op_execution_context + def retry_number(self): + return self.op_execution_context.retry_number + + @public + @property + @_copy_docs_from_op_execution_context + def op_config(self) -> Any: + return self.op_execution_context.op_config + + @property + @_copy_docs_from_op_execution_context + def node_handle(self) -> NodeHandle: + return self.op_execution_context.node_handle + + @property + @_copy_docs_from_op_execution_context + def op_handle(self) -> NodeHandle: + return self.op_execution_context.op_handle + + @property + @_copy_docs_from_op_execution_context + def op(self) -> Node: + return self.op_execution_context.op + + @public + @property + @_copy_docs_from_op_execution_context + def op_def(self) -> OpDefinition: + return self.op_execution_context.op_def + + @_copy_docs_from_op_execution_context + def describe_op(self) -> str: + return self.op_execution_context.describe_op() + + @public + @_copy_docs_from_op_execution_context + def get_mapping_key(self) -> Optional[str]: + return self.op_execution_context.get_mapping_key() + + @public + @property + @_copy_docs_from_op_execution_context + def selected_output_names(self) -> AbstractSet[str]: + return self.op_execution_context.selected_output_names + + #### job related + + @public + @property + @_copy_docs_from_op_execution_context + def job_name(self) -> str: + return self.op_execution_context.job_name + + @public + @property + @_copy_docs_from_op_execution_context + def job_def(self) -> JobDefinition: + return self.op_execution_context.job_def + + #### asset related + + @public + @property + @_copy_docs_from_op_execution_context + def asset_key(self) -> AssetKey: + return self.op_execution_context.asset_key + + @public + @property + @_copy_docs_from_op_execution_context + def has_assets_def(self) -> bool: + return self.op_execution_context.has_assets_def + + @public + @property + @_copy_docs_from_op_execution_context + def assets_def(self) -> AssetsDefinition: + return self.op_execution_context.assets_def + + @public + @_copy_docs_from_op_execution_context + def asset_key_for_output(self, output_name: str = "result") -> AssetKey: + return self.op_execution_context.asset_key_for_output(output_name=output_name) + + @public + @_copy_docs_from_op_execution_context + def output_for_asset_key(self, asset_key: AssetKey) -> str: + return self.op_execution_context.output_for_asset_key(asset_key=asset_key) + + @public + @_copy_docs_from_op_execution_context + def asset_key_for_input(self, input_name: str) -> AssetKey: + return self.op_execution_context.asset_key_for_input(input_name=input_name) + + @public + @property + @_copy_docs_from_op_execution_context + def selected_asset_keys(self) -> AbstractSet[AssetKey]: + return self.op_execution_context.selected_asset_keys + + #### execution related + + @public + @property + @_copy_docs_from_op_execution_context + def instance(self) -> DagsterInstance: + return self.op_execution_context.instance + + @property + @_copy_docs_from_op_execution_context + def step_launcher(self) -> Optional[StepLauncher]: + return self.op_execution_context.step_launcher + + @_copy_docs_from_op_execution_context + def get_step_execution_context(self) -> StepExecutionContext: + return self.op_execution_context.get_step_execution_context() + + #### partition_related + + @public + @property + @_copy_docs_from_op_execution_context + def has_partition_key(self) -> bool: + return self.op_execution_context.has_partition_key + + @public + @property + @_copy_docs_from_op_execution_context + def partition_key(self) -> str: + return self.op_execution_context.partition_key + + @public + @property + @_copy_docs_from_op_execution_context + def partition_keys(self) -> Sequence[str]: + return self.op_execution_context.partition_keys + + @deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.") + @public + @property + @_copy_docs_from_op_execution_context + def asset_partition_key_range(self) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range + + @public + @property + @_copy_docs_from_op_execution_context + def partition_key_range(self) -> PartitionKeyRange: + return self.op_execution_context.partition_key_range + + @public + @property + @_copy_docs_from_op_execution_context + def partition_time_window(self) -> TimeWindow: + return self.op_execution_context.partition_time_window + + @public + @_copy_docs_from_op_execution_context + def asset_partition_key_for_output(self, output_name: str = "result") -> str: + return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: + return self.op_execution_context.asset_partitions_time_window_for_output(output_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partition_key_range_for_output( + self, output_name: str = "result" + ) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range_for_output(output_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range_for_input(input_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partition_key_for_input(self, input_name: str) -> str: + return self.op_execution_context.asset_partition_key_for_input(input_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: + return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: + return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: + return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: + return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name) + + @public + @_copy_docs_from_op_execution_context + def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: + return self.op_execution_context.asset_partitions_time_window_for_input(input_name) + + #### Event log related + + @_copy_docs_from_op_execution_context + def has_events(self) -> bool: + return self.op_execution_context.has_events() + + @_copy_docs_from_op_execution_context + def consume_events(self) -> Iterator[DagsterEvent]: + yield from self.op_execution_context.consume_events() + + @public + @_copy_docs_from_op_execution_context + def log_event(self, event: UserEvent) -> None: + return self.op_execution_context.log_event(event) + + #### metadata related + + @public + @_copy_docs_from_op_execution_context + def add_output_metadata( + self, + metadata: Mapping[str, Any], + output_name: Optional[str] = None, + mapping_key: Optional[str] = None, + ) -> None: + return self.op_execution_context.add_output_metadata( + metadata=metadata, output_name=output_name, mapping_key=mapping_key + ) + + @_copy_docs_from_op_execution_context + def get_output_metadata( + self, output_name: str, mapping_key: Optional[str] = None + ) -> Optional[Mapping[str, Any]]: + return self.op_execution_context.get_output_metadata( + output_name=output_name, mapping_key=mapping_key + ) + + #### asset check related + + @public + @property + @_copy_docs_from_op_execution_context + def has_asset_checks_def(self) -> bool: + return self.op_execution_context.has_asset_checks_def + + @public + @property + @_copy_docs_from_op_execution_context + def asset_checks_def(self) -> AssetChecksDefinition: + return self.op_execution_context.asset_checks_def + + @public + @property + @_copy_docs_from_op_execution_context + def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]: + return self.op_execution_context.selected_asset_check_keys + + @property + @_copy_docs_from_op_execution_context + def asset_check_spec(self) -> AssetCheckSpec: + return self.op_execution_context.asset_check_spec + + #### data lineage related + + @public + @experimental + @_copy_docs_from_op_execution_context + def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: + return self.op_execution_context.get_asset_provenance(asset_key=asset_key) + + @_copy_docs_from_op_execution_context + def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None: + return self.op_execution_context.set_data_version( + asset_key=asset_key, data_version=data_version + ) + + # misc + + @public + @property + @_copy_docs_from_op_execution_context + def resources(self) -> Any: + return self.op_execution_context.resources + + @public + @property + @_copy_docs_from_op_execution_context + def log(self) -> DagsterLogManager: + return self.op_execution_context.log + + @public + @property + @_copy_docs_from_op_execution_context + def pdb(self) -> ForkedPdb: + return self.op_execution_context.pdb + + @property + @_copy_docs_from_op_execution_context + def is_subset(self): + return self.op_execution_context.is_subset + + # In this mode no conversion is done on returned values and missing but expected outputs are not + # allowed. + @property + @_copy_docs_from_op_execution_context + def requires_typed_event_stream(self) -> bool: + return self.op_execution_context.requires_typed_event_stream + + @property + @_copy_docs_from_op_execution_context + def typed_event_stream_error_message(self) -> Optional[str]: + return self.op_execution_context.typed_event_stream_error_message + + @_copy_docs_from_op_execution_context + def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None) -> None: + self.op_execution_context.set_requires_typed_event_stream(error_message=error_message) + def get_op_execution_context(self) -> "OpExecutionContext": - return OpExecutionContext(self._step_execution_context) + return self.op_execution_context @contextmanager @@ -1414,7 +1796,7 @@ def enter_execution_context( ) # Structured assuming upcoming changes to make AssetExecutionContext contain an OpExecutionContext - asset_ctx = AssetExecutionContext(step_context) + asset_ctx = AssetExecutionContext(op_execution_context=OpExecutionContext(step_context)) asset_token = _current_asset_execution_context.set(asset_ctx) try: @@ -1426,13 +1808,13 @@ def enter_execution_context( if is_asset_check: yield asset_ctx elif is_op_in_graph_asset or not is_sda_step: - yield asset_ctx.get_op_execution_context() + yield asset_ctx.op_execution_context else: yield asset_ctx elif context_annotation is AssetExecutionContext: yield asset_ctx else: - yield asset_ctx.get_op_execution_context() + yield asset_ctx.op_execution_context finally: _current_asset_execution_context.reset(asset_token) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py new file mode 100644 index 0000000000000..2c5773688cad3 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -0,0 +1,19 @@ +from dagster import AssetExecutionContext, OpExecutionContext + + +def test_doc_strings(): + ignores = [ + "_abc_impl", + "_events", + "_output_metadata", + "_pdb", + "_step_execution_context", + ] + for attr_name in dir(OpExecutionContext): + if attr_name.startswith("__") or attr_name in ignores: + continue + if hasattr(AssetExecutionContext, attr_name): + op_attr = getattr(OpExecutionContext, attr_name) + asset_attr = getattr(AssetExecutionContext, attr_name) + + assert op_attr.__doc__ == asset_attr.__doc__ diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 6bd4da58fb3d2..1fd029361981b 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -58,8 +58,8 @@ def __init__(self): @op def test_op_context_instance_check(context: OpExecutionContext): step_context = context._step_execution_context # noqa: SLF001 - asset_context = AssetExecutionContext(step_execution_context=step_context) op_context = OpExecutionContext(step_execution_context=step_context) + asset_context = AssetExecutionContext(op_execution_context=op_context) with pytest.raises(DeprecationWarning): isinstance(asset_context, OpExecutionContext) assert not isinstance(op_context, AssetExecutionContext)