Skip to content

Commit

Permalink
cp
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 22, 2023
1 parent 8414e24 commit 7ef9fad
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,16 @@ def create_assets_def_from_source_asset(source_asset: SourceAsset):
kwargs["resource_defs"] = source_asset.resource_defs

@asset(**kwargs)
def shim_asset(context: OpExecutionContext) -> None:
def shim_asset(context: OpExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return op_function.decorated_fn(context)
return_value = op_function.decorated_fn(context)
check.invariant(
return_value is None,
"The wrapped decorated_fn should return a value. If this changes, this code path must"
" changed to process the events appopriately.",
)

return shim_asset
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext):
def fn(context: OpExecutionContext) -> None:
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def an_observable_source_asset() -> DataVersion:
result = defs.get_implicit_global_asset_job_def().execute_in_process()

assert result.success
assert result.output_for_node("an_observable_source_asset") is None

all_observations = result.get_asset_observation_events()
assert len(all_observations) == 1
Expand Down

0 comments on commit 7ef9fad

Please sign in to comment.