Skip to content

Commit

Permalink
Store allow_backfills in the sensor metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 10, 2024
1 parent df8311b commit befafe9
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from dagster._core.definitions.utils import check_valid_name
from dagster._utils.tags import normalize_tags

ALLOW_BACKFILLS_METADATA_KEY = "dagster/allow_backfills"


def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext):
from dagster._core.definitions.automation_tick_evaluation_context import (
Expand Down Expand Up @@ -79,6 +81,8 @@ class AutomationConditionSensorDefinition(SensorDefinition):
sensor. The actual interval will be longer if the sensor evaluation takes longer than
the provided interval.
description (Optional[str]): A human-readable description of the sensor.
allow_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition
of any single asset is requested, rather than individual runs. Defaults to False.
"""

def __init__(
Expand All @@ -92,17 +96,12 @@ def __init__(
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
metadata: Optional[Mapping[str, object]] = None,
allow_backfills: bool = False,
**kwargs,
):
self._user_code = kwargs.get("user_code", False)
self._allow_backfills = check.opt_bool_param(
kwargs.get("allow_backfills"), "allow_backfills", default=False
)
check.param_invariant(
not (self._allow_backfills and not self._user_code),
"allow_backfills",
"Setting `allow_backfills` for a non-user-code AutomationConditionSensorDefinition is not supported.",
)
check.bool_param(allow_backfills, "allow_backfills")

self._default_condition = check.opt_inst_param(
kwargs.get("default_condition"), "default_condition", AutomationCondition
)
Expand All @@ -114,6 +113,10 @@ def __init__(

self._run_tags = normalize_tags(run_tags)

# only store this value in the metadata if it's True
if allow_backfills:
metadata = {**(metadata or {}), ALLOW_BACKFILLS_METADATA_KEY: True}

super().__init__(
name=check_valid_name(name),
job_name=None,
Expand All @@ -139,7 +142,7 @@ def asset_selection(self) -> AssetSelection:

@property
def allow_backfills(self) -> bool:
return self._allow_backfills
return ALLOW_BACKFILLS_METADATA_KEY in self.metadata

@property
def default_condition(self) -> Optional[AutomationCondition]:
Expand Down
11 changes: 10 additions & 1 deletion python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
)
from dagster._core.definitions.asset_key import AssetCheckKey, EntityKey
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.automation_condition_sensor_definition import (
ALLOW_BACKFILLS_METADATA_KEY,
)
from dagster._core.definitions.automation_tick_evaluation_context import (
AutomationTickEvaluationContext,
)
Expand Down Expand Up @@ -940,6 +943,7 @@ def _evaluate_auto_materialize_tick(
).without_checks() | AssetSelection.checks(
*{key for key in auto_materialize_entity_keys if isinstance(key, AssetCheckKey)}
)

run_requests, new_cursor, evaluations = AutomationTickEvaluationContext(
evaluation_id=evaluation_id,
asset_graph=asset_graph,
Expand All @@ -954,7 +958,12 @@ def _evaluate_auto_materialize_tick(
**sensor_tags,
},
observe_run_tags={AUTO_OBSERVE_TAG: "true", **sensor_tags},
allow_backfills=False,
allow_backfills=bool(
sensor
and sensor.metadata
and sensor.metadata.standard_metadata
and ALLOW_BACKFILLS_METADATA_KEY in sensor.metadata.standard_metadata
),
auto_observe_asset_keys=auto_observe_asset_keys,
logger=self._logger,
).evaluate()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dagster as dg


def get_defs() -> dg.Definitions:
from .backfill_simple_user_code import defs as uc_defs # noqa

return dg.Definitions(
assets=uc_defs.assets,
sensors=[
dg.AutomationConditionSensorDefinition(
name="the_sensor", asset_selection="*", allow_backfills=True
)
],
)


defs = get_defs()
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import AbstractSet, Mapping, Sequence, cast

import dagster._check as check
import pytest
from dagster import AssetMaterialization, RunsFilter, instance_for_test
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
Expand Down Expand Up @@ -418,9 +419,10 @@ def _get_subsets_by_key(
return {s.key: s for s in target_subset.iterate_asset_subsets(asset_graph)}


def test_backfill_creation_simple() -> None:
@pytest.mark.parametrize("location", ["backfill_simple_user_code", "backfill_simple_non_user_code"])
def test_backfill_creation_simple(location: str) -> None:
with get_workspace_request_context(
["backfill_simple"]
[location]
) as context, get_threadpool_executor() as executor:
asset_graph = context.create_request_context().asset_graph

Expand Down

0 comments on commit befafe9

Please sign in to comment.