From 52e8f24665835a84d96b3c53ebbf5fc7a4db1379 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 18 Dec 2023 13:20:06 -0500 Subject: [PATCH] organize methods by topic --- .../_core/execution/context/compute.py | 273 +++++++++++------- 1 file changed, 170 insertions(+), 103 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 46d91a6e2c0ae..db5d2348330f1 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1370,151 +1370,107 @@ def get() -> "AssetExecutionContext": def op_execution_context(self) -> OpExecutionContext: return self._op_execution_context - #### op-related + #### Run related @property @_copy_docs_from_op_execution_context - def retry_number(self): - return self.op_execution_context.retry_number + def run(self) -> DagsterRun: + return self.op_execution_context.run - @public @property @_copy_docs_from_op_execution_context - def op_config(self) -> Any: - return self.op_execution_context.op_config + def dagster_run(self) -> DagsterRun: + return self.op_execution_context.dagster_run - @public @property @_copy_docs_from_op_execution_context - def job_name(self) -> str: - return self.op_execution_context.job_name + def run_id(self) -> str: + return self.op_execution_context.run_id @property @_copy_docs_from_op_execution_context - def node_handle(self) -> NodeHandle: - return self.op_execution_context.node_handle + def run_config(self) -> Mapping[str, object]: + return self.op_execution_context.run_config @property @_copy_docs_from_op_execution_context - def op_handle(self) -> NodeHandle: - return self.op_execution_context.op_handle + def run_tags(self) -> Mapping[str, str]: + return self.op_execution_context.run_tags - @property + @public @_copy_docs_from_op_execution_context - def op(self) -> Node: - return self.op_execution_context.op + def has_tag(self, key: str) -> bool: + return self.op_execution_context.has_tag(key) @public - @property @_copy_docs_from_op_execution_context - def op_def(self) -> OpDefinition: - return self.op_execution_context.op_def + def get_tag(self, key: str) -> Optional[str]: + return self.op_execution_context.get_tag(key) - #### execution related + #### op related - @public @property @_copy_docs_from_op_execution_context - def instance(self) -> DagsterInstance: - return self.op_execution_context.instance + def retry_number(self): + return self.op_execution_context.retry_number @public @property @_copy_docs_from_op_execution_context - def resources(self) -> Any: - return self.op_execution_context.resources + def op_config(self) -> Any: + return self.op_execution_context.op_config @property @_copy_docs_from_op_execution_context - def step_launcher(self) -> Optional[StepLauncher]: - return self.op_execution_context.step_launcher - - @_copy_docs_from_op_execution_context - def get_step_execution_context(self) -> StepExecutionContext: - return self.op_execution_context.get_step_execution_context() - - #### partition_related + def node_handle(self) -> NodeHandle: + return self.op_execution_context.node_handle - @public @property @_copy_docs_from_op_execution_context - def has_partition_key(self) -> bool: - return self.op_execution_context.has_partition_key + def op_handle(self) -> NodeHandle: + return self.op_execution_context.op_handle - @public @property @_copy_docs_from_op_execution_context - def partition_key(self) -> str: - return self.op_execution_context.partition_key + def op(self) -> Node: + return self.op_execution_context.op @public @property @_copy_docs_from_op_execution_context - def partition_keys(self) -> Sequence[str]: - return self.op_execution_context.partition_keys + def op_def(self) -> OpDefinition: + return self.op_execution_context.op_def - @deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.") - @public - @property @_copy_docs_from_op_execution_context - def asset_partition_key_range(self) -> PartitionKeyRange: - return self.op_execution_context.asset_partition_key_range + def describe_op(self) -> str: + return self.op_execution_context.describe_op() @public - @property @_copy_docs_from_op_execution_context - def partition_key_range(self) -> PartitionKeyRange: - return self.op_execution_context.partition_key_range + def get_mapping_key(self) -> Optional[str]: + return self.op_execution_context.get_mapping_key() @public @property @_copy_docs_from_op_execution_context - def partition_time_window(self) -> TimeWindow: - return self.op_execution_context.partition_time_window - - #### Event log related - - @_copy_docs_from_op_execution_context - def has_events(self) -> bool: - return self.op_execution_context.has_events() + def selected_output_names(self) -> AbstractSet[str]: + return self.op_execution_context.selected_output_names - @_copy_docs_from_op_execution_context - def consume_events(self) -> Iterator[DagsterEvent]: - yield from self.op_execution_context.consume_events() + #### job related @public + @property @_copy_docs_from_op_execution_context - def log_event(self, event: UserEvent) -> None: - return self.op_execution_context.log_event(event) + def job_name(self) -> str: + return self.op_execution_context.job_name @public + @property @_copy_docs_from_op_execution_context - def add_output_metadata( - self, - metadata: Mapping[str, Any], - output_name: Optional[str] = None, - mapping_key: Optional[str] = None, - ) -> None: - return self.op_execution_context.add_output_metadata( - metadata=metadata, output_name=output_name, mapping_key=mapping_key - ) - - @_copy_docs_from_op_execution_context - def get_output_metadata( - self, output_name: str, mapping_key: Optional[str] = None - ) -> Optional[Mapping[str, Any]]: - return self.op_execution_context.get_output_metadata( - output_name=output_name, mapping_key=mapping_key - ) - - @_copy_docs_from_op_execution_context - def describe_op(self) -> str: - return self.op_execution_context.describe_op() + def job_def(self) -> JobDefinition: + return self.op_execution_context.job_def - @public - @_copy_docs_from_op_execution_context - def get_mapping_key(self) -> Optional[str]: - return self.op_execution_context.get_mapping_key() + #### asset related @public @property @@ -1534,50 +1490,82 @@ def has_assets_def(self) -> bool: def assets_def(self) -> AssetsDefinition: return self.op_execution_context.assets_def + @public + @_copy_docs_from_op_execution_context + def asset_key_for_output(self, output_name: str = "result") -> AssetKey: + return self.op_execution_context.asset_key_for_output(output_name=output_name) + + @public + @_copy_docs_from_op_execution_context + def output_for_asset_key(self, asset_key: AssetKey) -> str: + return self.op_execution_context.output_for_asset_key(asset_key=asset_key) + + @public + @_copy_docs_from_op_execution_context + def asset_key_for_input(self, input_name: str) -> AssetKey: + return self.op_execution_context.asset_key_for_input(input_name=input_name) + @public @property @_copy_docs_from_op_execution_context def selected_asset_keys(self) -> AbstractSet[AssetKey]: return self.op_execution_context.selected_asset_keys + #### execution related + @public @property @_copy_docs_from_op_execution_context - def has_asset_checks_def(self) -> bool: - return self.op_execution_context.has_asset_checks_def + def instance(self) -> DagsterInstance: + return self.op_execution_context.instance + + @property + @_copy_docs_from_op_execution_context + def step_launcher(self) -> Optional[StepLauncher]: + return self.op_execution_context.step_launcher + + @_copy_docs_from_op_execution_context + def get_step_execution_context(self) -> StepExecutionContext: + return self.op_execution_context.get_step_execution_context() + + #### partition_related @public @property @_copy_docs_from_op_execution_context - def asset_checks_def(self) -> AssetChecksDefinition: - return self.op_execution_context.asset_checks_def + def has_partition_key(self) -> bool: + return self.op_execution_context.has_partition_key @public @property @_copy_docs_from_op_execution_context - def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]: - return self.op_execution_context.selected_asset_check_keys + def partition_key(self) -> str: + return self.op_execution_context.partition_key @public @property @_copy_docs_from_op_execution_context - def selected_output_names(self) -> AbstractSet[str]: - return self.op_execution_context.selected_output_names + def partition_keys(self) -> Sequence[str]: + return self.op_execution_context.partition_keys + @deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.") @public + @property @_copy_docs_from_op_execution_context - def asset_key_for_output(self, output_name: str = "result") -> AssetKey: - return self.op_execution_context.asset_key_for_output(output_name=output_name) + def asset_partition_key_range(self) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range @public + @property @_copy_docs_from_op_execution_context - def output_for_asset_key(self, asset_key: AssetKey) -> str: - return self.op_execution_context.output_for_asset_key(asset_key=asset_key) + def partition_key_range(self) -> PartitionKeyRange: + return self.op_execution_context.partition_key_range @public + @property @_copy_docs_from_op_execution_context - def asset_key_for_input(self, input_name: str) -> AssetKey: - return self.op_execution_context.asset_key_for_input(input_name=input_name) + def partition_time_window(self) -> TimeWindow: + return self.op_execution_context.partition_time_window @public @_copy_docs_from_op_execution_context @@ -1631,6 +1619,70 @@ def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: return self.op_execution_context.asset_partitions_time_window_for_input(input_name) + #### Event log related + + @_copy_docs_from_op_execution_context + def has_events(self) -> bool: + return self.op_execution_context.has_events() + + @_copy_docs_from_op_execution_context + def consume_events(self) -> Iterator[DagsterEvent]: + yield from self.op_execution_context.consume_events() + + @public + @_copy_docs_from_op_execution_context + def log_event(self, event: UserEvent) -> None: + return self.op_execution_context.log_event(event) + + #### metadata related + + @public + @_copy_docs_from_op_execution_context + def add_output_metadata( + self, + metadata: Mapping[str, Any], + output_name: Optional[str] = None, + mapping_key: Optional[str] = None, + ) -> None: + return self.op_execution_context.add_output_metadata( + metadata=metadata, output_name=output_name, mapping_key=mapping_key + ) + + @_copy_docs_from_op_execution_context + def get_output_metadata( + self, output_name: str, mapping_key: Optional[str] = None + ) -> Optional[Mapping[str, Any]]: + return self.op_execution_context.get_output_metadata( + output_name=output_name, mapping_key=mapping_key + ) + + #### asset check related + + @public + @property + @_copy_docs_from_op_execution_context + def has_asset_checks_def(self) -> bool: + return self.op_execution_context.has_asset_checks_def + + @public + @property + @_copy_docs_from_op_execution_context + def asset_checks_def(self) -> AssetChecksDefinition: + return self.op_execution_context.asset_checks_def + + @public + @property + @_copy_docs_from_op_execution_context + def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]: + return self.op_execution_context.selected_asset_check_keys + + @property + @_copy_docs_from_op_execution_context + def asset_check_spec(self) -> AssetCheckSpec: + return self.op_execution_context.asset_check_spec + + #### data lineage related + @public @experimental @_copy_docs_from_op_execution_context @@ -1643,10 +1695,25 @@ def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> No asset_key=asset_key, data_version=data_version ) + # misc + + @public @property @_copy_docs_from_op_execution_context - def asset_check_spec(self) -> AssetCheckSpec: - return self.op_execution_context.asset_check_spec + def resources(self) -> Any: + return self.op_execution_context.resources + + @public + @property + @_copy_docs_from_op_execution_context + def log(self) -> DagsterLogManager: + return self.op_execution_context.log + + @public + @property + @_copy_docs_from_op_execution_context + def pdb(self) -> ForkedPdb: + return self.op_execution_context.pdb @property @_copy_docs_from_op_execution_context