diff --git a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py index 68102bbcd5d6b..af92af15dcad3 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py @@ -17,6 +17,8 @@ from dagster._core.definitions.utils import check_valid_name from dagster._utils.tags import normalize_tags +ALLOW_BACKFILLS_METADATA_KEY = "dagster/allow_backfills" + def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext): from dagster._core.definitions.automation_tick_evaluation_context import ( @@ -79,6 +81,8 @@ class AutomationConditionSensorDefinition(SensorDefinition): sensor. The actual interval will be longer if the sensor evaluation takes longer than the provided interval. description (Optional[str]): A human-readable description of the sensor. + allow_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition + of any single asset is requested, rather than individual runs. Defaults to False. """ def __init__( @@ -92,17 +96,12 @@ def __init__( minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, metadata: Optional[Mapping[str, object]] = None, + allow_backfills: bool = False, **kwargs, ): self._user_code = kwargs.get("user_code", False) - self._allow_backfills = check.opt_bool_param( - kwargs.get("allow_backfills"), "allow_backfills", default=False - ) - check.param_invariant( - not (self._allow_backfills and not self._user_code), - "allow_backfills", - "Setting `allow_backfills` for a non-user-code AutomationConditionSensorDefinition is not supported.", - ) + check.bool_param(allow_backfills, "allow_backfills") + self._default_condition = check.opt_inst_param( kwargs.get("default_condition"), "default_condition", AutomationCondition ) @@ -114,6 +113,10 @@ def __init__( self._run_tags = normalize_tags(run_tags) + # only store this value in the metadata if it's True + if allow_backfills: + metadata = {**(metadata or {}), ALLOW_BACKFILLS_METADATA_KEY: True} + super().__init__( name=check_valid_name(name), job_name=None, @@ -139,7 +142,7 @@ def asset_selection(self) -> AssetSelection: @property def allow_backfills(self) -> bool: - return self._allow_backfills + return ALLOW_BACKFILLS_METADATA_KEY in self.metadata @property def default_condition(self) -> Optional[AutomationCondition]: diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 6d946308cb38f..f2a1298f685f3 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -19,6 +19,9 @@ ) from dagster._core.definitions.asset_key import AssetCheckKey, EntityKey from dagster._core.definitions.asset_selection import AssetSelection +from dagster._core.definitions.automation_condition_sensor_definition import ( + ALLOW_BACKFILLS_METADATA_KEY, +) from dagster._core.definitions.automation_tick_evaluation_context import ( AutomationTickEvaluationContext, ) @@ -940,6 +943,7 @@ def _evaluate_auto_materialize_tick( ).without_checks() | AssetSelection.checks( *{key for key in auto_materialize_entity_keys if isinstance(key, AssetCheckKey)} ) + run_requests, new_cursor, evaluations = AutomationTickEvaluationContext( evaluation_id=evaluation_id, asset_graph=asset_graph, @@ -954,7 +958,12 @@ def _evaluate_auto_materialize_tick( **sensor_tags, }, observe_run_tags={AUTO_OBSERVE_TAG: "true", **sensor_tags}, - allow_backfills=False, + allow_backfills=bool( + sensor + and sensor.metadata + and sensor.metadata.standard_metadata + and ALLOW_BACKFILLS_METADATA_KEY in sensor.metadata.standard_metadata + ), auto_observe_asset_keys=auto_observe_asset_keys, logger=self._logger, ).evaluate() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_non_user_code.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_non_user_code.py new file mode 100644 index 0000000000000..65b0e0c9a7981 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_non_user_code.py @@ -0,0 +1,17 @@ +import dagster as dg + + +def get_defs() -> dg.Definitions: + from .backfill_simple_user_code import defs as uc_defs # noqa + + return dg.Definitions( + assets=uc_defs.assets, + sensors=[ + dg.AutomationConditionSensorDefinition( + name="the_sensor", asset_selection="*", allow_backfills=True + ) + ], + ) + + +defs = get_defs() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_user_code.py similarity index 100% rename from python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple.py rename to python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/backfill_simple_user_code.py diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index 03d6f8d3c3396..7ba21c16daecb 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -5,6 +5,7 @@ from typing import AbstractSet, Mapping, Sequence, cast import dagster._check as check +import pytest from dagster import AssetMaterialization, RunsFilter, instance_for_test from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor @@ -418,9 +419,10 @@ def _get_subsets_by_key( return {s.key: s for s in target_subset.iterate_asset_subsets(asset_graph)} -def test_backfill_creation_simple() -> None: +@pytest.mark.parametrize("location", ["backfill_simple_user_code", "backfill_simple_non_user_code"]) +def test_backfill_creation_simple(location: str) -> None: with get_workspace_request_context( - ["backfill_simple"] + [location] ) as context, get_threadpool_executor() as executor: asset_graph = context.create_request_context().asset_graph