From f1e8a25778490480b80f1a7221f83e7c5f982b12 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Thu, 10 Oct 2024 16:04:59 -0700 Subject: [PATCH] [1.9] Promote hidden params to top-level args in AutomationConditionSensorDefinition --- .../automation_condition_sensor_definition.py | 21 ++++++++++++------- .../definitions/backfill_simple_user_code.py | 4 +++- .../backfill_with_runs_and_checks.py | 4 +++- .../definitions/custom_condition.py | 4 +++- .../definitions/default_condition.py | 2 +- .../definitions/massive_user_code.py | 4 +++- ..._automation_condition_sensor_definition.py | 11 +++++++--- 7 files changed, 35 insertions(+), 15 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py index bb5a41ef59b83..376d09c970ec8 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py @@ -92,6 +92,12 @@ class AutomationConditionSensorDefinition(SensorDefinition): description (Optional[str]): A human-readable description of the sensor. emit_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition of any single asset is requested, rather than individual runs. Defaults to True. + use_user_code_server (bool): (experimental) If set to True, this sensor will be evaluated in the user + code server, rather than the AssetDaemon. This enables evaluating custom AutomationCondition + subclasses. + default_condition (Optional[AutomationCondition]): (experimental) If provided, this condition will + be used for any selected assets or asset checks which do not have an automation condition defined. + Requires `use_user_code_server` to be set to `True`. """ def __init__( @@ -106,16 +112,17 @@ def __init__( description: Optional[str] = None, metadata: Optional[Mapping[str, object]] = None, emit_backfills: bool = True, - **kwargs, + use_user_code_server: bool = False, + default_condition: Optional[AutomationCondition] = None, ): - self._user_code = kwargs.get("user_code", False) - check.bool_param(emit_backfills, "emit_backfills") + self._use_user_code_server = use_user_code_server + check.bool_param(emit_backfills, "allow_backfills") self._default_condition = check.opt_inst_param( - kwargs.get("default_condition"), "default_condition", AutomationCondition + default_condition, "default_condition", AutomationCondition ) check.param_invariant( - not (self._default_condition and not self._user_code), + not (self._default_condition and not self._use_user_code_server), "default_condition", "Setting a `default_condition` for a non-user-code AutomationConditionSensorDefinition is not supported.", ) @@ -129,7 +136,7 @@ def __init__( super().__init__( name=check_valid_name(name), job_name=None, - evaluation_fn=partial(_evaluate, self) if self._user_code else not_supported, + evaluation_fn=partial(_evaluate, self) if self._use_user_code_server else not_supported, minimum_interval_seconds=minimum_interval_seconds, description=description, job=None, @@ -159,4 +166,4 @@ def default_condition(self) -> Optional[AutomationCondition]: @property def sensor_type(self) -> SensorType: - return SensorType.AUTOMATION if self._user_code else SensorType.AUTO_MATERIALIZE + return SensorType.AUTOMATION if self._use_user_code_server else SensorType.AUTO_MATERIALIZE diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_user_code.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_user_code.py index a09992573e5db..3b202d9fde7bb 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_user_code.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_user_code.py @@ -46,6 +46,8 @@ def E() -> None: ... defs = dg.Definitions( assets=[A, B, C, D, E], sensors=[ - dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True) + dg.AutomationConditionSensorDefinition( + "the_sensor", asset_selection="*", use_user_code_server=True + ) ], ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_with_runs_and_checks.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_with_runs_and_checks.py index f57daaa6997ca..8195d67dffb8b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_with_runs_and_checks.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_with_runs_and_checks.py @@ -71,6 +71,8 @@ def outside2() -> dg.AssetCheckResult: ... assets=[backfillA, backfillB, backfillC, run1, run2], asset_checks=[outsideA, outsideB, outside1, outside2], sensors=[ - dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True) + dg.AutomationConditionSensorDefinition( + "the_sensor", asset_selection="*", use_user_code_server=True + ) ], ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/custom_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/custom_condition.py index 9b6b6af5a686c..abca0cc5d15c1 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/custom_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/custom_condition.py @@ -21,6 +21,8 @@ def foo() -> None: ... defs = dg.Definitions( assets=[foo], sensors=[ - dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True) + dg.AutomationConditionSensorDefinition( + "the_sensor", asset_selection="*", use_user_code_server=True + ) ], ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py index be9cf31cfb5a4..47f2b3bc8232c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py @@ -16,7 +16,7 @@ def every_5_minutes_asset() -> None: ... name="all_assets", asset_selection=dg.AssetSelection.all(), default_condition=dg.AutomationCondition.cron_tick_passed("*/5 * * * *"), - user_code=True, + use_user_code_server=True, ) ], ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/massive_user_code.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/massive_user_code.py index 7a44087659f00..8c5f4c19f7759 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/massive_user_code.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/massive_user_code.py @@ -27,7 +27,9 @@ def get_defs(n: int) -> Definitions: return Definitions( assets=assets, sensors=[ - AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True) + AutomationConditionSensorDefinition( + "the_sensor", asset_selection="*", use_user_code_server=True + ) ], ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py b/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py index 56dc0319e525a..0bec08a7ca73f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py @@ -25,7 +25,7 @@ def test_constructor(selection: AssetSelection, user_code: bool) -> None: description="fdsjkl", default_status=DefaultSensorStatus.RUNNING, minimum_interval_seconds=50, - user_code=user_code, + use_user_code_server=user_code, ) assert automation_sensor.name == "foo" assert automation_sensor.run_tags == tags @@ -49,13 +49,18 @@ def test_default_condition() -> None: ) sensor = AutomationConditionSensorDefinition( - "foo", asset_selection="*", default_condition=AutomationCondition.eager(), user_code=True + "foo", + asset_selection="*", + default_condition=AutomationCondition.eager(), + use_user_code_server=True, ) assert sensor.default_condition == AutomationCondition.eager() def test_limits() -> None: - sensor = AutomationConditionSensorDefinition("foo", asset_selection="*", user_code=True) + sensor = AutomationConditionSensorDefinition( + "foo", asset_selection="*", use_user_code_server=True + ) defs = Definitions( assets=build_assets(