Skip to content

Commit

Permalink
[1.9] Update default value of allow_backfills to True (#25204)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This updates the default value of this setting. To be merged in 1.9.

## How I Tested These Changes

## Changelog

By default, `AutomationConditionSensorDefinitions` will now emit backfills to handle cases where more than one partition of an asset is requested on a given tick. This allows that asset's BackfillPolicy to be respected. This feature can be disabled by setting `allow_backfills` to `False`.
  • Loading branch information
OwenKephart authored Oct 22, 2024
1 parent 0c1803a commit e850c6d
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class AutomationConditionSensorDefinition(SensorDefinition):
the provided interval.
description (Optional[str]): A human-readable description of the sensor.
emit_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition
of any single asset is requested, rather than individual runs. Defaults to False.
of any single asset is requested, rather than individual runs. Defaults to True.
"""

def __init__(
Expand All @@ -107,7 +107,7 @@ def __init__(
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
metadata: Optional[Mapping[str, object]] = None,
emit_backfills: bool = False,
emit_backfills: bool = True,
**kwargs,
):
self._user_code = kwargs.get("user_code", False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
StructuredCursor,
)
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._time import get_current_datetime

if TYPE_CHECKING:
Expand Down Expand Up @@ -69,11 +68,6 @@ def create(key: EntityKey, evaluator: "AutomationConditionEvaluator") -> "Automa
)
condition_unqiue_id = condition.get_node_unique_id(parent_unique_id=None, index=None)

if condition.has_rule_condition and evaluator.emit_backfills:
raise DagsterInvalidDefinitionError(
"Cannot use AutoMaterializePolicies and request backfills. Please use AutomationCondition or set DECLARATIVE_AUTOMATION_REQUEST_BACKFILLS to False."
)

return AutomationContext(
condition=condition,
condition_unique_id=condition_unqiue_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ def get_defs() -> dg.Definitions:

return dg.Definitions(
assets=uc_defs.assets,
sensors=[
dg.AutomationConditionSensorDefinition(
name="the_sensor", asset_selection="*", emit_backfills=True
)
],
sensors=[dg.AutomationConditionSensorDefinition(name="the_sensor", asset_selection="*")],
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def E() -> None: ...
defs = dg.Definitions(
assets=[A, B, C, D, E],
sensors=[
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", user_code=True, emit_backfills=True
)
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def outside2() -> dg.AssetCheckResult: ...
assets=[backfillA, backfillB, backfillC, run1, run2],
asset_checks=[outsideA, outsideB, outside1, outside2],
sensors=[
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", user_code=True, emit_backfills=True
)
dg.AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
],
)

0 comments on commit e850c6d

Please sign in to comment.