Skip to content

Commit

Permalink
[1.9] Promote hidden params to top-level args in AutomationConditionS…
Browse files Browse the repository at this point in the history
…ensorDefinition
  • Loading branch information
OwenKephart committed Oct 11, 2024
1 parent b7da539 commit f1e8a25
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ class AutomationConditionSensorDefinition(SensorDefinition):
description (Optional[str]): A human-readable description of the sensor.
emit_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition
of any single asset is requested, rather than individual runs. Defaults to True.
use_user_code_server (bool): (experimental) If set to True, this sensor will be evaluated in the user
code server, rather than the AssetDaemon. This enables evaluating custom AutomationCondition
subclasses.
default_condition (Optional[AutomationCondition]): (experimental) If provided, this condition will
be used for any selected assets or asset checks which do not have an automation condition defined.
Requires `use_user_code_server` to be set to `True`.
"""

def __init__(
Expand All @@ -106,16 +112,17 @@ def __init__(
description: Optional[str] = None,
metadata: Optional[Mapping[str, object]] = None,
emit_backfills: bool = True,
**kwargs,
use_user_code_server: bool = False,
default_condition: Optional[AutomationCondition] = None,
):
self._user_code = kwargs.get("user_code", False)
check.bool_param(emit_backfills, "emit_backfills")
self._use_user_code_server = use_user_code_server
check.bool_param(emit_backfills, "allow_backfills")

self._default_condition = check.opt_inst_param(
kwargs.get("default_condition"), "default_condition", AutomationCondition
default_condition, "default_condition", AutomationCondition
)
check.param_invariant(
not (self._default_condition and not self._user_code),
not (self._default_condition and not self._use_user_code_server),
"default_condition",
"Setting a `default_condition` for a non-user-code AutomationConditionSensorDefinition is not supported.",
)
Expand All @@ -129,7 +136,7 @@ def __init__(
super().__init__(
name=check_valid_name(name),
job_name=None,
evaluation_fn=partial(_evaluate, self) if self._user_code else not_supported,
evaluation_fn=partial(_evaluate, self) if self._use_user_code_server else not_supported,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=None,
Expand Down Expand Up @@ -159,4 +166,4 @@ def default_condition(self) -> Optional[AutomationCondition]:

@property
def sensor_type(self) -> SensorType:
return SensorType.AUTOMATION if self._user_code else SensorType.AUTO_MATERIALIZE
return SensorType.AUTOMATION if self._use_user_code_server else SensorType.AUTO_MATERIALIZE
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def E() -> None: ...
defs = dg.Definitions(
assets=[A, B, C, D, E],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def outside2() -> dg.AssetCheckResult: ...
assets=[backfillA, backfillB, backfillC, run1, run2],
asset_checks=[outsideA, outsideB, outside1, outside2],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def foo() -> None: ...
defs = dg.Definitions(
assets=[foo],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def every_5_minutes_asset() -> None: ...
name="all_assets",
asset_selection=dg.AssetSelection.all(),
default_condition=dg.AutomationCondition.cron_tick_passed("*/5 * * * *"),
user_code=True,
use_user_code_server=True,
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def get_defs(n: int) -> Definitions:
return Definitions(
assets=assets,
sensors=[
AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_constructor(selection: AssetSelection, user_code: bool) -> None:
description="fdsjkl",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=50,
user_code=user_code,
use_user_code_server=user_code,
)
assert automation_sensor.name == "foo"
assert automation_sensor.run_tags == tags
Expand All @@ -49,13 +49,18 @@ def test_default_condition() -> None:
)

sensor = AutomationConditionSensorDefinition(
"foo", asset_selection="*", default_condition=AutomationCondition.eager(), user_code=True
"foo",
asset_selection="*",
default_condition=AutomationCondition.eager(),
use_user_code_server=True,
)
assert sensor.default_condition == AutomationCondition.eager()


def test_limits() -> None:
sensor = AutomationConditionSensorDefinition("foo", asset_selection="*", user_code=True)
sensor = AutomationConditionSensorDefinition(
"foo", asset_selection="*", use_user_code_server=True
)

defs = Definitions(
assets=build_assets(
Expand Down

0 comments on commit f1e8a25

Please sign in to comment.