Skip to content

Commit

Permalink
[1.9] Promote hidden params to top-level args in AutomationConditionS…
Browse files Browse the repository at this point in the history
…ensorDefinition (dagster-io#25206)

## Summary & Motivation

As title -- these should be documented and visible to users.

Open to suggestions on `use_user_code_server` as a name.

## How I Tested These Changes

## Changelog

Added a new `use_user_code_server` paramter to `AutomationConditionSensorDefinition`. If set, the sensor will be evaluated in the user code server (as traditional sensors are), allowing custom `AutomationCondition` subclasses to be evaluated. To learn more, view the docs [TODO].

Added a new `default_condition` parameter to `AutomationConditionSensorDefinition`. If set, this condition will be used for any assets or asset checks within the selection that do not have an automation condition defined. Requires `use_user_code_server` to be set.
  • Loading branch information
OwenKephart authored and Grzyblon committed Oct 26, 2024
1 parent 4212e5b commit 0d2b7c8
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ class AutomationConditionSensorDefinition(SensorDefinition):
description (Optional[str]): A human-readable description of the sensor.
emit_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition
of any single asset is requested, rather than individual runs. Defaults to True.
use_user_code_server (bool): (experimental) If set to True, this sensor will be evaluated in the user
code server, rather than the AssetDaemon. This enables evaluating custom AutomationCondition
subclasses, and ensures that the condition definitions will remain in sync with your user code
version, eliminating version skew. Note: currently a maximum of 500 assets or checks may be
targeted at a time by a sensor that has this value set.
default_condition (Optional[AutomationCondition]): (experimental) If provided, this condition will
be used for any selected assets or asset checks which do not have an automation condition defined.
Requires `use_user_code_server` to be set to `True`.
"""

def __init__(
Expand All @@ -108,16 +116,17 @@ def __init__(
description: Optional[str] = None,
metadata: Optional[Mapping[str, object]] = None,
emit_backfills: bool = True,
**kwargs,
use_user_code_server: bool = False,
default_condition: Optional[AutomationCondition] = None,
):
self._user_code = kwargs.get("user_code", False)
check.bool_param(emit_backfills, "emit_backfills")
self._use_user_code_server = use_user_code_server
check.bool_param(emit_backfills, "allow_backfills")

self._default_condition = check.opt_inst_param(
kwargs.get("default_condition"), "default_condition", AutomationCondition
default_condition, "default_condition", AutomationCondition
)
check.param_invariant(
not (self._default_condition and not self._user_code),
not (self._default_condition and not self._use_user_code_server),
"default_condition",
"Setting a `default_condition` for a non-user-code AutomationConditionSensorDefinition is not supported.",
)
Expand All @@ -131,7 +140,7 @@ def __init__(
super().__init__(
name=check_valid_name(name),
job_name=None,
evaluation_fn=partial(_evaluate, self) if self._user_code else not_supported,
evaluation_fn=partial(_evaluate, self) if self._use_user_code_server else not_supported,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=None,
Expand Down Expand Up @@ -161,4 +170,4 @@ def default_condition(self) -> Optional[AutomationCondition]:

@property
def sensor_type(self) -> SensorType:
return SensorType.AUTOMATION if self._user_code else SensorType.AUTO_MATERIALIZE
return SensorType.AUTOMATION if self._use_user_code_server else SensorType.AUTO_MATERIALIZE
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def get_defs(n: int) -> Definitions:
return Definitions(
assets=assets,
sensors=[
AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def E() -> None: ...
defs = dg.Definitions(
assets=[A, B, C, D, E],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def outside2() -> dg.AssetCheckResult: ...
assets=[backfillA, backfillB, backfillC, run1, run2],
asset_checks=[outsideA, outsideB, outside1, outside2],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def foo() -> None: ...
defs = dg.Definitions(
assets=[foo],
sensors=[
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def every_5_minutes_asset() -> None: ...
name="all_assets",
asset_selection=dg.AssetSelection.all(),
default_condition=dg.AutomationCondition.cron_tick_passed("*/5 * * * *"),
user_code=True,
use_user_code_server=True,
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def get_defs() -> dg.Definitions:
assets=simple_defs.assets,
sensors=[
dg.AutomationConditionSensorDefinition(
name="the_sensor", asset_selection="*", user_code=False
name="the_sensor", asset_selection="*", use_user_code_server=False
)
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def get_defs() -> dg.Definitions:
assets=simple_defs.assets,
sensors=[
dg.AutomationConditionSensorDefinition(
name="the_sensor", asset_selection="*", user_code=True
name="the_sensor", asset_selection="*", use_user_code_server=True
)
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_constructor(selection: AssetSelection, user_code: bool) -> None:
description="fdsjkl",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=50,
user_code=user_code,
use_user_code_server=user_code,
)
assert automation_sensor.name == "foo"
assert automation_sensor.run_tags == tags
Expand All @@ -50,13 +50,18 @@ def test_default_condition() -> None:
)

sensor = AutomationConditionSensorDefinition(
"foo", asset_selection="*", default_condition=AutomationCondition.eager(), user_code=True
"foo",
asset_selection="*",
default_condition=AutomationCondition.eager(),
use_user_code_server=True,
)
assert sensor.default_condition == AutomationCondition.eager()


def test_limits() -> None:
sensor = AutomationConditionSensorDefinition("foo", asset_selection="*", user_code=True)
sensor = AutomationConditionSensorDefinition(
"foo", asset_selection="*", use_user_code_server=True
)

defs = Definitions(
assets=build_assets(
Expand Down

0 comments on commit 0d2b7c8

Please sign in to comment.