Skip to content

Commit

Permalink
[1.9] Promote hidden params to top-level args in AutomationConditionS…
Browse files Browse the repository at this point in the history
…ensorDefinition
  • Loading branch information
OwenKephart committed Oct 17, 2024
1 parent 74938cc commit b3c916d
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ class AutomationConditionSensorDefinition(SensorDefinition):
description (Optional[str]): A human-readable description of the sensor.
emit_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition
of any single asset is requested, rather than individual runs. Defaults to True.
use_user_code_server (bool): (experimental) If set to True, this sensor will be evaluated in the user
code server, rather than the AssetDaemon. This enables evaluating custom AutomationCondition
subclasses, and ensures that the condition definitions will remain in sync with your user code
version, eliminating version skew. Note: currently a maximum of 500 assets or checks may be
targeted at a time by a sensor that has this value set.
default_condition (Optional[AutomationCondition]): (experimental) If provided, this condition will
be used for any selected assets or asset checks which do not have an automation condition defined.
Requires `use_user_code_server` to be set to `True`.
"""

def __init__(
Expand All @@ -108,16 +116,17 @@ def __init__(
description: Optional[str] = None,
metadata: Optional[Mapping[str, object]] = None,
emit_backfills: bool = True,
**kwargs,
use_user_code_server: bool = False,
default_condition: Optional[AutomationCondition] = None,
):
self._user_code = kwargs.get("user_code", False)
check.bool_param(emit_backfills, "emit_backfills")
self._use_user_code_server = use_user_code_server
check.bool_param(emit_backfills, "allow_backfills")

self._default_condition = check.opt_inst_param(
kwargs.get("default_condition"), "default_condition", AutomationCondition
default_condition, "default_condition", AutomationCondition
)
check.param_invariant(
not (self._default_condition and not self._user_code),
not (self._default_condition and not self._use_user_code_server),
"default_condition",
"Setting a `default_condition` for a non-user-code AutomationConditionSensorDefinition is not supported.",
)
Expand All @@ -131,7 +140,7 @@ def __init__(
super().__init__(
name=check_valid_name(name),
job_name=None,
evaluation_fn=partial(_evaluate, self) if self._user_code else not_supported,
evaluation_fn=partial(_evaluate, self) if self._use_user_code_server else not_supported,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=None,
Expand Down Expand Up @@ -161,4 +170,4 @@ def default_condition(self) -> Optional[AutomationCondition]:

@property
def sensor_type(self) -> SensorType:
return SensorType.AUTOMATION if self._user_code else SensorType.AUTO_MATERIALIZE
return SensorType.AUTOMATION if self._use_user_code_server else SensorType.AUTO_MATERIALIZE
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def get_defs(n: int) -> Definitions:
return Definitions(
assets=assets,
sensors=[
AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def E() -> None: ...
defs = dg.Definitions(
assets=[A, B, C, D, E],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def outside2() -> dg.AssetCheckResult: ...
assets=[backfillA, backfillB, backfillC, run1, run2],
asset_checks=[outsideA, outsideB, outside1, outside2],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def foo() -> None: ...
defs = dg.Definitions(
assets=[foo],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def every_5_minutes_asset() -> None: ...
name="all_assets",
asset_selection=dg.AssetSelection.all(),
default_condition=dg.AutomationCondition.cron_tick_passed("*/5 * * * *"),
user_code=True,
use_user_code_server=True,
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_constructor(selection: AssetSelection, user_code: bool) -> None:
description="fdsjkl",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=50,
user_code=user_code,
use_user_code_server=user_code,
)
assert automation_sensor.name == "foo"
assert automation_sensor.run_tags == tags
Expand All @@ -50,13 +50,18 @@ def test_default_condition() -> None:
)

sensor = AutomationConditionSensorDefinition(
"foo", asset_selection="*", default_condition=AutomationCondition.eager(), user_code=True
"foo",
asset_selection="*",
default_condition=AutomationCondition.eager(),
use_user_code_server=True,
)
assert sensor.default_condition == AutomationCondition.eager()


def test_limits() -> None:
sensor = AutomationConditionSensorDefinition("foo", asset_selection="*", user_code=True)
sensor = AutomationConditionSensorDefinition(
"foo", asset_selection="*", use_user_code_server=True
)

defs = Definitions(
assets=build_assets(
Expand Down

0 comments on commit b3c916d

Please sign in to comment.