Skip to content

Commit

Permalink
Add error for user code sensors with too many entities targeted
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 17, 2024
1 parent 26443d6 commit f8c18ab
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
repository,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._utils.warnings import disable_dagster_warnings


class AssetLayerConfig(NamedTuple):
Expand All @@ -23,36 +24,38 @@ class AssetLayerConfig(NamedTuple):
def build_assets(
id: str,
layer_configs: Sequence[AssetLayerConfig],
automation_condition: AutomationCondition,
automation_condition: Optional[AutomationCondition],
) -> List[AssetsDefinition]:
layers = []

for layer_config in layer_configs:
parent_index = 0
layer = []
for i in range(layer_config.n_assets):
if layer_config.n_upstreams_per_asset > 0:
# each asset connects to n_upstreams_per_asset assets from the above layer, chosen
# in a round-robin manner
non_argument_deps = {
layers[-1][(parent_index + j) % len(layers[-1])].key
for j in range(layer_config.n_upstreams_per_asset)
}
parent_index += layer_config.n_upstreams_per_asset
else:
non_argument_deps = set()
with disable_dagster_warnings():
for layer_num, layer_config in enumerate(layer_configs):
parent_index = 0
layer = []
for i in range(layer_config.n_assets):
if layer_config.n_upstreams_per_asset > 0:
# each asset connects to n_upstreams_per_asset assets from the above layer, chosen
# in a round-robin manner
non_argument_deps = {
layers[-1][(parent_index + j) % len(layers[-1])].key
for j in range(layer_config.n_upstreams_per_asset)
}
parent_index += layer_config.n_upstreams_per_asset
else:
non_argument_deps = set()

@asset(
partitions_def=layer_config.partitions_def,
name=f"{id}_{len(layers)}_{i}",
automation_condition=automation_condition,
non_argument_deps=non_argument_deps,
)
def _asset():
pass
@asset(
partitions_def=layer_config.partitions_def,
name=f"{id}_{len(layers)}_{i}",
automation_condition=automation_condition,
non_argument_deps=non_argument_deps,
group_name=f"g{layer_num}",
)
def _asset():
pass

layer.append(_asset)
layers.append(layer)
layer.append(_asset)
layers.append(layer)

return list(itertools.chain(*layers))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
SensorType,
)
from dagster._core.definitions.utils import check_valid_name
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._utils.tags import normalize_tags

EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills"
MAX_ENTITIES = 500


def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext):
Expand All @@ -34,7 +36,8 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor
context.cursor,
asset_graph,
)
run_requests, new_cursor, updated_evaluations = AutomationTickEvaluationContext(

evaluation_context = AutomationTickEvaluationContext(
evaluation_id=cursor.evaluation_id,
instance=context.instance,
asset_graph=asset_graph,
Expand All @@ -46,7 +49,15 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor
emit_backfills=sensor_def.emit_backfills,
default_condition=sensor_def.default_condition,
logger=context.log,
).evaluate()
)
if evaluation_context.total_keys > MAX_ENTITIES:
raise DagsterInvalidInvocationError(
f'AutomationConditionSensorDefintion "{sensor_def.name}" targets {evaluation_context.total_keys} '
f"assets or checks, which is more than the limit of {MAX_ENTITIES}. Either set `use_user_code_server` to `False`, "
"or split this sensor into multiple AutomationConditionSensorDefinitions with AssetSelections that target fewer "
"assets or checks."
)
run_requests, new_cursor, updated_evaluations = evaluation_context.evaluate()

return SensorResult(
run_requests=run_requests,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
)
if default_condition or asset_graph.get(entity_key).automation_condition is not None
}
self._total_keys = len(resolved_entity_keys)
self._evaluation_id = evaluation_id
self._evaluator = AutomationConditionEvaluator(
entity_keys=resolved_entity_keys,
Expand All @@ -95,6 +96,10 @@ def cursor(self) -> AssetDaemonCursor:
def asset_graph(self) -> BaseAssetGraph:
return self._evaluator.asset_graph

@property
def total_keys(self) -> int:
return self._total_keys

def _legacy_build_auto_observe_run_requests(self) -> Sequence[RunRequest]:
current_timestamp = self._evaluator.evaluation_time.timestamp()
assets_to_auto_observe: Set[AssetKey] = set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ def as_auto_materialize_policy(self) -> "AutoMaterializePolicy":
"""Returns an AutoMaterializePolicy which contains this condition."""
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy

return AutoMaterializePolicy.from_automation_condition(self)
with disable_dagster_warnings():
return AutoMaterializePolicy.from_automation_condition(self)

@abstractmethod
def evaluate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DagsterInstance
from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets


@pytest.mark.parametrize(
Expand Down Expand Up @@ -49,3 +53,41 @@ def test_default_condition() -> None:
"foo", asset_selection="*", default_condition=AutomationCondition.eager(), user_code=True
)
assert sensor.default_condition == AutomationCondition.eager()


def test_limits() -> None:
sensor = AutomationConditionSensorDefinition("foo", asset_selection="*", user_code=True)

defs = Definitions(
assets=build_assets(
"test",
layer_configs=[AssetLayerConfig(1000)],
automation_condition=AutomationCondition.eager(),
)
)
with pytest.raises(DagsterInvalidInvocationError, match='"foo" targets 1000 assets or checks'):
sensor(
build_sensor_context(
instance=DagsterInstance.ephemeral(),
repository_def=defs.get_repository_def(),
),
)

# more than 500 total assets, but only 400 with a condition
with_condition = build_assets(
"cond",
layer_configs=[AssetLayerConfig(400)],
automation_condition=AutomationCondition.eager(),
)
without_condition = build_assets(
"no_cond",
layer_configs=[AssetLayerConfig(400)],
automation_condition=None,
)
defs = Definitions(assets=[*with_condition, *without_condition])
sensor(
build_sensor_context(
instance=DagsterInstance.ephemeral(),
repository_def=defs.get_repository_def(),
),
)

0 comments on commit f8c18ab

Please sign in to comment.