Skip to content

Commit

Permalink
add run prop test
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Dec 8, 2023
1 parent e30840a commit 3ca5a82
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> No
self._typed_event_stream_error_message = error_message


class RunlessRunProperties(RunProperties):
@property
def dagster_run(self):
raise DagsterInvalidPropertyError(_property_msg("dagster_run", "property"))


class RunlessOpExecutionContext(OpExecutionContext, BaseRunlessContext):
"""The ``context`` object available as the first argument to an op's compute function when
being invoked directly. Can also be used as a context manager.
Expand Down Expand Up @@ -829,7 +835,7 @@ def for_type(self, dagster_type: DagsterType) -> TypeCheckContext:
self._check_bound(fn_name="for_type", fn_type="method")
resources = cast(NamedTuple, self.resources)
return TypeCheckContext(
self.run_id,
self.run_properties.run_id,
self.log,
ScopedResourcesBuilder(resources._asdict()),
dagster_type,
Expand All @@ -842,11 +848,12 @@ def observe_output(self, output_name: str, mapping_key: Optional[str] = None) ->
def run_properties(self) -> RunProperties:
self._check_bound(fn_name="run_properties", fn_type="property")
if self._run_props is None:
self._run_props = RunProperties(
self._run_props = RunlessRunProperties(
run_id=self.op_execution_context.run_id,
run_config=self.op_execution_context.run_config,
dagster_run=self.op_execution_context.run,
retry_number=self.op_execution_context.retry_number,
# pass None for dagster_run, since RunlessRunProperties raises an exception for this attr
dagster_run=None, # type: ignore
retry_number=0,
)
return self._run_props

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1589,3 +1589,17 @@ async def get_results():
asyncio.run(get_results())

assert_context_unbound(ctx)


def test_run_properties_access():
@asset
def access_run_properties(context: AssetExecutionContext):
assert context.run_properties.run_id == "EPHEMERAL"
assert context.run_properties.retry_number == 0

with pytest.raises(DagsterInvalidPropertyError):
context.run_properties.dagster_run # noqa:B018

ctx = build_asset_context()

access_run_properties(ctx)

0 comments on commit 3ca5a82

Please sign in to comment.