diff --git a/python_modules/dagster-pipes/dagster_pipes/__init__.py b/python_modules/dagster-pipes/dagster_pipes/__init__.py index 379c8176a691d..fc0cf9fbc2755 100644 --- a/python_modules/dagster-pipes/dagster_pipes/__init__.py +++ b/python_modules/dagster-pipes/dagster_pipes/__init__.py @@ -623,6 +623,70 @@ def make_channel( # ######################## +class IContext(ABC): + """Base class for asset context implemented by AssetExecutionContext and ExtContext.""" + + @property + @abstractmethod + def asset_key(self): + """The AssetKey for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ + + @property + @abstractmethod + def asset_keys(self): + """The AssetKeys for the asset being materialized. If no asset is being materialized, errors.""" + + @property + @abstractmethod + def provenance(self): + """The data provenance for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ + + @property + @abstractmethod + def provenance_by_asset_key(self): + """A dictionary of data provenance for the assets being materialized, keyed by asset key. + If no asset is being materialized, errors. + """ + + @property + @abstractmethod + def code_version(self): + """The code version for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ + + @property + @abstractmethod + def code_version_by_asset_key(self): + """A dictionary of code versions for the assets being materialized, keyed by asset key. + If no asset is being materialized, errors. + """ + + @property + @abstractmethod + def is_partitioned(self) -> bool: + """True if the current execution is partitioned.""" + + @property + @abstractmethod + def run_id(self) -> str: + """The run id of the current execution.""" + + @property + @abstractmethod + def job_name(self) -> Optional[str]: + """The name of the job that is executing.""" + + @property + @abstractmethod + def retry_number(self) -> int: + """The number of retries of this execution.""" + + def init_dagster_ext( *, context_loader: Optional[ExtContextLoader] = None, @@ -650,7 +714,7 @@ def init_dagster_ext( return context -class ExtContext: +class ExtContext(IContext): _instance: ClassVar[Optional["ExtContext"]] = None @classmethod @@ -734,7 +798,7 @@ def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: return code_version_by_asset_key @property - def is_partition_step(self) -> bool: + def is_partitioned(self) -> bool: return self._data["partition_key_range"] is not None @property diff --git a/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py b/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py index c3dcf16d4c2e7..3f846e73083ff 100644 --- a/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py +++ b/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py @@ -132,7 +132,7 @@ def test_multi_asset_context(): def test_no_partition_context(): context = _make_external_execution_context() - assert not context.is_partition_step + assert not context.is_partitioned _assert_undefined(context, "partition_key") _assert_undefined(context, "partition_key_range") _assert_undefined(context, "partition_time_window") @@ -147,7 +147,7 @@ def test_single_partition_context(): partition_time_window=None, ) - assert context.is_partition_step + assert context.is_partitioned assert context.partition_key == "foo" assert context.partition_key_range == partition_key_range assert context.partition_time_window is None @@ -163,7 +163,7 @@ def test_multiple_partition_context(): partition_time_window=time_window, ) - assert context.is_partition_step + assert context.is_partitioned _assert_undefined(context, "partition_key") assert context.partition_key_range == partition_key_range assert context.partition_time_window == time_window