Skip to content

Commit

Permalink
organize methods by topic
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Dec 18, 2023
1 parent 141d6ab commit 151dccc
Showing 1 changed file with 170 additions and 103 deletions.
273 changes: 170 additions & 103 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1370,151 +1370,107 @@ def get() -> "AssetExecutionContext":
def op_execution_context(self) -> OpExecutionContext:
return self._op_execution_context

#### op-related
#### Run related

@property
@_copy_docs_from_op_execution_context
def retry_number(self):
return self.op_execution_context.retry_number
def run(self) -> DagsterRun:
return self.op_execution_context.run

@public
@property
@_copy_docs_from_op_execution_context
def op_config(self) -> Any:
return self.op_execution_context.op_config
def dagster_run(self) -> DagsterRun:
return self.op_execution_context.dagster_run

@public
@property
@_copy_docs_from_op_execution_context
def job_name(self) -> str:
return self.op_execution_context.job_name
def run_id(self) -> str:
return self.op_execution_context.run_id

@property
@_copy_docs_from_op_execution_context
def node_handle(self) -> NodeHandle:
return self.op_execution_context.node_handle
def run_config(self) -> Mapping[str, object]:
return self.op_execution_context.run_config

@property
@_copy_docs_from_op_execution_context
def op_handle(self) -> NodeHandle:
return self.op_execution_context.op_handle
def run_tags(self) -> Mapping[str, str]:
return self.op_execution_context.run_tags

@property
@public
@_copy_docs_from_op_execution_context
def op(self) -> Node:
return self.op_execution_context.op
def has_tag(self, key: str) -> bool:
return self.op_execution_context.has_tag(key)

@public
@property
@_copy_docs_from_op_execution_context
def op_def(self) -> OpDefinition:
return self.op_execution_context.op_def
def get_tag(self, key: str) -> Optional[str]:
return self.op_execution_context.get_tag(key)

#### execution related
#### op related

@public
@property
@_copy_docs_from_op_execution_context
def instance(self) -> DagsterInstance:
return self.op_execution_context.instance
def retry_number(self):
return self.op_execution_context.retry_number

@public
@property
@_copy_docs_from_op_execution_context
def resources(self) -> Any:
return self.op_execution_context.resources
def op_config(self) -> Any:
return self.op_execution_context.op_config

@property
@_copy_docs_from_op_execution_context
def step_launcher(self) -> Optional[StepLauncher]:
return self.op_execution_context.step_launcher

@_copy_docs_from_op_execution_context
def get_step_execution_context(self) -> StepExecutionContext:
return self.op_execution_context.get_step_execution_context()

#### partition_related
def node_handle(self) -> NodeHandle:
return self.op_execution_context.node_handle

@public
@property
@_copy_docs_from_op_execution_context
def has_partition_key(self) -> bool:
return self.op_execution_context.has_partition_key
def op_handle(self) -> NodeHandle:
return self.op_execution_context.op_handle

@public
@property
@_copy_docs_from_op_execution_context
def partition_key(self) -> str:
return self.op_execution_context.partition_key
def op(self) -> Node:
return self.op_execution_context.op

@public
@property
@_copy_docs_from_op_execution_context
def partition_keys(self) -> Sequence[str]:
return self.op_execution_context.partition_keys
def op_def(self) -> OpDefinition:
return self.op_execution_context.op_def

@deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.")
@public
@property
@_copy_docs_from_op_execution_context
def asset_partition_key_range(self) -> PartitionKeyRange:
return self.op_execution_context.asset_partition_key_range
def describe_op(self) -> str:
return self.op_execution_context.describe_op()

@public
@property
@_copy_docs_from_op_execution_context
def partition_key_range(self) -> PartitionKeyRange:
return self.op_execution_context.partition_key_range
def get_mapping_key(self) -> Optional[str]:
return self.op_execution_context.get_mapping_key()

@public
@property
@_copy_docs_from_op_execution_context
def partition_time_window(self) -> TimeWindow:
return self.op_execution_context.partition_time_window

#### Event log related

@_copy_docs_from_op_execution_context
def has_events(self) -> bool:
return self.op_execution_context.has_events()
def selected_output_names(self) -> AbstractSet[str]:
return self.op_execution_context.selected_output_names

@_copy_docs_from_op_execution_context
def consume_events(self) -> Iterator[DagsterEvent]:
yield from self.op_execution_context.consume_events()
#### job related

@public
@property
@_copy_docs_from_op_execution_context
def log_event(self, event: UserEvent) -> None:
return self.op_execution_context.log_event(event)
def job_name(self) -> str:
return self.op_execution_context.job_name

@public
@property
@_copy_docs_from_op_execution_context
def add_output_metadata(
self,
metadata: Mapping[str, Any],
output_name: Optional[str] = None,
mapping_key: Optional[str] = None,
) -> None:
return self.op_execution_context.add_output_metadata(
metadata=metadata, output_name=output_name, mapping_key=mapping_key
)

@_copy_docs_from_op_execution_context
def get_output_metadata(
self, output_name: str, mapping_key: Optional[str] = None
) -> Optional[Mapping[str, Any]]:
return self.op_execution_context.get_output_metadata(
output_name=output_name, mapping_key=mapping_key
)

@_copy_docs_from_op_execution_context
def describe_op(self) -> str:
return self.op_execution_context.describe_op()
def job_def(self) -> JobDefinition:
return self.op_execution_context.job_def

@public
@_copy_docs_from_op_execution_context
def get_mapping_key(self) -> Optional[str]:
return self.op_execution_context.get_mapping_key()
#### asset related

@public
@property
Expand All @@ -1534,50 +1490,82 @@ def has_assets_def(self) -> bool:
def assets_def(self) -> AssetsDefinition:
return self.op_execution_context.assets_def

@public
@_copy_docs_from_op_execution_context
def asset_key_for_output(self, output_name: str = "result") -> AssetKey:
return self.op_execution_context.asset_key_for_output(output_name=output_name)

@public
@_copy_docs_from_op_execution_context
def output_for_asset_key(self, asset_key: AssetKey) -> str:
return self.op_execution_context.output_for_asset_key(asset_key=asset_key)

@public
@_copy_docs_from_op_execution_context
def asset_key_for_input(self, input_name: str) -> AssetKey:
return self.op_execution_context.asset_key_for_input(input_name=input_name)

@public
@property
@_copy_docs_from_op_execution_context
def selected_asset_keys(self) -> AbstractSet[AssetKey]:
return self.op_execution_context.selected_asset_keys

#### execution related

@public
@property
@_copy_docs_from_op_execution_context
def has_asset_checks_def(self) -> bool:
return self.op_execution_context.has_asset_checks_def
def instance(self) -> DagsterInstance:
return self.op_execution_context.instance

@property
@_copy_docs_from_op_execution_context
def step_launcher(self) -> Optional[StepLauncher]:
return self.op_execution_context.step_launcher

@_copy_docs_from_op_execution_context
def get_step_execution_context(self) -> StepExecutionContext:
return self.op_execution_context.get_step_execution_context()

#### partition_related

@public
@property
@_copy_docs_from_op_execution_context
def asset_checks_def(self) -> AssetChecksDefinition:
return self.op_execution_context.asset_checks_def
def has_partition_key(self) -> bool:
return self.op_execution_context.has_partition_key

@public
@property
@_copy_docs_from_op_execution_context
def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return self.op_execution_context.selected_asset_check_keys
def partition_key(self) -> str:
return self.op_execution_context.partition_key

@public
@property
@_copy_docs_from_op_execution_context
def selected_output_names(self) -> AbstractSet[str]:
return self.op_execution_context.selected_output_names
def partition_keys(self) -> Sequence[str]:
return self.op_execution_context.partition_keys

@deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.")
@public
@property
@_copy_docs_from_op_execution_context
def asset_key_for_output(self, output_name: str = "result") -> AssetKey:
return self.op_execution_context.asset_key_for_output(output_name=output_name)
def asset_partition_key_range(self) -> PartitionKeyRange:
return self.op_execution_context.asset_partition_key_range

@public
@property
@_copy_docs_from_op_execution_context
def output_for_asset_key(self, asset_key: AssetKey) -> str:
return self.op_execution_context.output_for_asset_key(asset_key=asset_key)
def partition_key_range(self) -> PartitionKeyRange:
return self.op_execution_context.partition_key_range

@public
@property
@_copy_docs_from_op_execution_context
def asset_key_for_input(self, input_name: str) -> AssetKey:
return self.op_execution_context.asset_key_for_input(input_name=input_name)
def partition_time_window(self) -> TimeWindow:
return self.op_execution_context.partition_time_window

@public
@_copy_docs_from_op_execution_context
Expand Down Expand Up @@ -1631,6 +1619,70 @@ def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]:
def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow:
return self.op_execution_context.asset_partitions_time_window_for_input(input_name)

#### Event log related

@_copy_docs_from_op_execution_context
def has_events(self) -> bool:
return self.op_execution_context.has_events()

@_copy_docs_from_op_execution_context
def consume_events(self) -> Iterator[DagsterEvent]:
yield from self.op_execution_context.consume_events()

@public
@_copy_docs_from_op_execution_context
def log_event(self, event: UserEvent) -> None:
return self.op_execution_context.log_event(event)

#### metadata related

@public
@_copy_docs_from_op_execution_context
def add_output_metadata(
self,
metadata: Mapping[str, Any],
output_name: Optional[str] = None,
mapping_key: Optional[str] = None,
) -> None:
return self.op_execution_context.add_output_metadata(
metadata=metadata, output_name=output_name, mapping_key=mapping_key
)

@_copy_docs_from_op_execution_context
def get_output_metadata(
self, output_name: str, mapping_key: Optional[str] = None
) -> Optional[Mapping[str, Any]]:
return self.op_execution_context.get_output_metadata(
output_name=output_name, mapping_key=mapping_key
)

#### asset check related

@public
@property
@_copy_docs_from_op_execution_context
def has_asset_checks_def(self) -> bool:
return self.op_execution_context.has_asset_checks_def

@public
@property
@_copy_docs_from_op_execution_context
def asset_checks_def(self) -> AssetChecksDefinition:
return self.op_execution_context.asset_checks_def

@public
@property
@_copy_docs_from_op_execution_context
def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return self.op_execution_context.selected_asset_check_keys

@property
@_copy_docs_from_op_execution_context
def asset_check_spec(self) -> AssetCheckSpec:
return self.op_execution_context.asset_check_spec

#### data lineage related

@public
@experimental
@_copy_docs_from_op_execution_context
Expand All @@ -1643,10 +1695,25 @@ def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> No
asset_key=asset_key, data_version=data_version
)

# misc

@public
@property
@_copy_docs_from_op_execution_context
def asset_check_spec(self) -> AssetCheckSpec:
return self.op_execution_context.asset_check_spec
def resources(self) -> Any:
return self.op_execution_context.resources

@public
@property
@_copy_docs_from_op_execution_context
def log(self) -> DagsterLogManager:
return self.op_execution_context.log

@public
@property
@_copy_docs_from_op_execution_context
def pdb(self) -> ForkedPdb:
return self.op_execution_context.pdb

@property
@_copy_docs_from_op_execution_context
Expand Down

0 comments on commit 151dccc

Please sign in to comment.