Skip to content

Commit

Permalink
Create default_automation_condition_sensor in Definitions object
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 22, 2024
1 parent ae9482f commit c63e190
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.partition import PartitionedConfig
from dagster._core.definitions.reconstruct import ReconstructableRepository
from dagster._core.definitions.sensor_definition import RunRequest, SensorDefinition, SkipReason
from dagster._core.definitions.sensor_definition import (
RunRequest,
SensorDefinition,
SensorType,
SkipReason,
)
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.log_manager import coerce_valid_log_level
Expand Down Expand Up @@ -1250,7 +1255,9 @@ def jobless_sensor(_):
auto_materialize_sensor = AutomationConditionSensorDefinition(
"my_auto_materialize_sensor",
asset_selection=AssetSelection.assets(
"fresh_diamond_bottom", "asset_with_automation_condition"
"fresh_diamond_bottom",
"asset_with_automation_condition",
"asset_with_custom_automation_condition",
),
)

Expand Down Expand Up @@ -2166,6 +2173,12 @@ def define_asset_checks():


def _targets_asset_job(instigator: Union[ScheduleDefinition, SensorDefinition]) -> bool:
if isinstance(instigator, SensorDefinition) and instigator.sensor_type in (
# these contain asset selections, which are invalid with the dict repo
SensorType.AUTOMATION,
SensorType.AUTO_MATERIALIZE,
):
return True
try:
return instigator.job_name in asset_job_names or instigator.has_anonymous_job
except DagsterInvalidDefinitionError: # thrown when `job_name` is invalid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1656,17 +1656,22 @@ def test_asset_selection(graphql_context):

assert (
result.data["sensorOrError"]["assetSelection"]["assetSelectionString"]
== "fresh_diamond_bottom or asset_with_automation_condition"
== "fresh_diamond_bottom or asset_with_automation_condition or asset_with_custom_automation_condition"
)
assert result.data["sensorOrError"]["assetSelection"]["assetKeys"] == [
{"path": ["asset_with_automation_condition"]},
{"path": ["asset_with_custom_automation_condition"]},
{"path": ["fresh_diamond_bottom"]},
]
assert result.data["sensorOrError"]["assetSelection"]["assets"] == [
{
"key": {"path": ["asset_with_automation_condition"]},
"definition": {"assetKey": {"path": ["asset_with_automation_condition"]}},
},
{
"key": {"path": ["asset_with_custom_automation_condition"]},
"definition": {"assetKey": {"path": ["asset_with_custom_automation_condition"]}},
},
{
"key": {"path": ["fresh_diamond_bottom"]},
"definition": {"assetKey": {"path": ["fresh_diamond_bottom"]}},
Expand All @@ -1676,6 +1681,9 @@ def test_asset_selection(graphql_context):
{
"key": {"path": ["asset_with_automation_condition"]},
},
{
"key": {"path": ["asset_with_custom_automation_condition"]},
},
{
"key": {"path": ["fresh_diamond_bottom"]},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._utils.tags import normalize_tags

EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills"
MAX_ENTITIES = 500
EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills"
DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME = "default_automation_condition_sensor"


def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext):
Expand Down Expand Up @@ -57,6 +58,7 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor
"or split this sensor into multiple AutomationConditionSensorDefinitions with AssetSelections that target fewer "
"assets or checks."
)

run_requests, new_cursor, updated_evaluations = evaluation_context.evaluate()

return SensorResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
from dagster._core.definitions.schedule_definition import ScheduleDefinition
from dagster._core.definitions.sensor_definition import SensorDefinition
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition
from dagster._core.definitions.utils import dedupe_object_refs
from dagster._core.definitions.utils import (
add_default_automation_condition_sensor,
dedupe_object_refs,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.execution.with_resources import with_resources
Expand Down Expand Up @@ -268,10 +271,17 @@ def _create_repository_using_definitions_args(
# First, dedupe all definition types.
sensors = dedupe_object_refs(sensors)
jobs = dedupe_object_refs(jobs)
assets = dedupe_object_refs(assets)
assets = _canonicalize_specs_to_assets_defs(dedupe_object_refs(assets))
schedules = dedupe_object_refs(schedules)
asset_checks = dedupe_object_refs(asset_checks)

# add in a default automation condition sensor definition if required
sensors = add_default_automation_condition_sensor(
sensors,
[asset for asset in assets if not isinstance(asset, CacheableAssetsDefinition)],
asset_checks or [],
)

executor_def = (
executor
if isinstance(executor, ExecutorDefinition) or executor is None
Expand All @@ -296,7 +306,7 @@ def _create_repository_using_definitions_args(
)
def created_repo():
return [
*with_resources(_canonicalize_specs_to_assets_defs(assets or []), resource_defs),
*with_resources(assets, resource_defs),
*with_resources(asset_checks or [], resource_defs),
*(schedules_with_resources),
*(sensors_with_resources),
Expand Down
97 changes: 96 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
cast,
)

import yaml

import dagster._check as check
from dagster._core.definitions.asset_key import AssetCheckKey, EntityKey
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.utils import is_valid_email
from dagster._utils.warnings import deprecation_warning
from dagster._utils.warnings import deprecation_warning, disable_dagster_warnings
from dagster._utils.yaml_utils import merge_yaml_strings, merge_yamls

DEFAULT_OUTPUT = "result"
Expand Down Expand Up @@ -59,10 +62,16 @@

if TYPE_CHECKING:
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.sensor_definition import SensorDefinition
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.remote_representation.external import RemoteSensor


class NoValueSentinel:
Expand Down Expand Up @@ -311,3 +320,89 @@ def resolve_automation_condition(
def dedupe_object_refs(objects: Optional[Iterable[T]]) -> Sequence[T]:
"""Dedupe definitions by reference equality."""
return list({id(obj): obj for obj in objects}.values()) if objects is not None else []


def add_default_automation_condition_sensor(
sensors: Sequence["SensorDefinition"],
assets: Iterable[Union["AssetsDefinition", "SourceAsset"]],
asset_checks: Iterable["AssetsDefinition"],
) -> Sequence["SensorDefinition"]:
"""Adds a default automation condition sensor if the provided sensors do not already handle all
provided assets.
"""
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.automation_condition_sensor_definition import (
DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME,
AutomationConditionSensorDefinition,
)

with disable_dagster_warnings():
asset_graph = AssetGraph.from_assets([*assets, *asset_checks])
sensor_selection = get_default_automation_condition_sensor_selection(sensors, asset_graph)
if sensor_selection:
default_sensor = AutomationConditionSensorDefinition(
DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, asset_selection=sensor_selection
)
sensors = [*sensors, default_sensor]

return sensors


def get_default_automation_condition_sensor_selection(
sensors: Sequence[Union["SensorDefinition", "RemoteSensor"]], asset_graph: "BaseAssetGraph"
) -> Optional["AssetSelection"]:
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.sensor_definition import SensorType

automation_condition_sensors = sorted(
(
s
for s in sensors
if s.sensor_type in (SensorType.AUTO_MATERIALIZE, SensorType.AUTOMATION)
),
key=lambda s: s.name,
)

automation_condition_keys = set()
for k in asset_graph.materializable_asset_keys | asset_graph.asset_check_keys:
if asset_graph.get(k).automation_condition is not None:
automation_condition_keys.add(k)

has_auto_observe_keys = False
for k in asset_graph.observable_asset_keys:
if (
# for backcompat, treat auto-observe assets as if they have a condition
asset_graph.get(k).automation_condition is not None
or asset_graph.get(k).auto_observe_interval_minutes is not None
):
has_auto_observe_keys = True
automation_condition_keys.add(k)

# get the set of keys that are handled by an existing sensor
covered_keys: Set[EntityKey] = set()
for sensor in automation_condition_sensors:
selection = check.not_none(sensor.asset_selection)
covered_keys = covered_keys.union(
selection.resolve(asset_graph) | selection.resolve_checks(asset_graph)
)

default_sensor_keys = automation_condition_keys - covered_keys
if len(default_sensor_keys) > 0:
# Use AssetSelection.all if the default sensor is the only sensor - otherwise
# enumerate the assets that are not already included in some other
# non-default sensor
default_sensor_asset_selection = AssetSelection.all(include_sources=has_auto_observe_keys)

# if there are any asset checks, include checks in the selection
if any(isinstance(k, AssetCheckKey) for k in default_sensor_keys):
default_sensor_asset_selection |= AssetSelection.all_asset_checks()

# remove any selections that are already covered
for sensor in automation_condition_sensors:
default_sensor_asset_selection = default_sensor_asset_selection - check.not_none(
sensor.asset_selection
)
return default_sensor_asset_selection
# no additional sensor required
else:
return None
Loading

0 comments on commit c63e190

Please sign in to comment.