Skip to content

Commit

Permalink
Add test demonstrating that we do not hit the RESOURCE_EXHAUSTED erro…
Browse files Browse the repository at this point in the history
…r above a given threshold (dagster-io#25214)

## Summary & Motivation

As title. This sets a baseline for feeling "safe" with this many assets.

Some upgrades were needed to the test suite to make it more realistic such that we could trip the error.

Future changes will allow us to greatly increase, or entirely remove, this limit.

## How I Tested These Changes

Confirmed that if 1000 assets were supplied, we would fail this test.

## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
OwenKephart authored and Grzyblon committed Oct 26, 2024
1 parent 5646577 commit f4e2cf8
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

from dagster import (
AssetsDefinition,
AutoMaterializePolicy,
AutomationCondition,
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
PartitionsDefinition,
StaticPartitionsDefinition,
asset,
repository,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy


class AssetLayerConfig(NamedTuple):
Expand All @@ -22,7 +23,7 @@ class AssetLayerConfig(NamedTuple):
def build_assets(
id: str,
layer_configs: Sequence[AssetLayerConfig],
auto_materialize_policy: AutoMaterializePolicy = AutoMaterializePolicy.eager(),
automation_condition: AutomationCondition,
) -> List[AssetsDefinition]:
layers = []

Expand All @@ -44,7 +45,7 @@ def build_assets(
@asset(
partitions_def=layer_config.partitions_def,
name=f"{id}_{len(layers)}_{i}",
auto_materialize_policy=auto_materialize_policy,
automation_condition=automation_condition,
non_argument_deps=non_argument_deps,
)
def _asset():
Expand Down Expand Up @@ -74,6 +75,7 @@ def auto_materialize_large_time_graph():
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily),
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily),
],
automation_condition=AutoMaterializePolicy.eager().to_automation_condition(),
)


Expand All @@ -88,4 +90,5 @@ def auto_materialize_large_static_graph():
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=static),
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=None),
],
automation_condition=AutoMaterializePolicy.eager().to_automation_condition(),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from dagster import (
AutomationCondition,
AutomationConditionSensorDefinition,
DailyPartitionsDefinition,
Definitions,
HourlyPartitionsDefinition,
)
from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets


def get_defs(n: int) -> Definitions:
hourly_partitions_def = HourlyPartitionsDefinition("2020-01-01-00:00")
daily_partitions_def = DailyPartitionsDefinition("2020-01-01")
unit = n // 10
assets = build_assets(
id="perf_test",
layer_configs=[
AssetLayerConfig(1 * unit, 0, hourly_partitions_def),
AssetLayerConfig(2 * unit, 2, hourly_partitions_def),
AssetLayerConfig(2 * unit, 4, hourly_partitions_def),
AssetLayerConfig(2 * unit, 4, daily_partitions_def),
AssetLayerConfig(2 * unit, 2, daily_partitions_def),
AssetLayerConfig(1 * unit, 2, daily_partitions_def),
],
automation_condition=AutomationCondition.eager(),
)
return Definitions(
assets=assets,
sensors=[
AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
],
)


defs = get_defs(500)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import os
import sys
import time
from contextlib import contextmanager
from typing import AbstractSet, Mapping, Optional, Sequence, cast

Expand All @@ -15,7 +16,12 @@
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.remote_representation.external import RemoteSensor
from dagster._core.remote_representation.origin import InProcessCodeLocationOrigin
from dagster._core.scheduler.instigation import InstigatorState, SensorInstigatorData
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorTick,
SensorInstigatorData,
TickStatus,
)
from dagster._core.storage.dagster_run import DagsterRun
from dagster._core.test_utils import (
InProcessTestWorkspaceLoadTarget,
Expand All @@ -27,27 +33,33 @@
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import InheritContextThreadPoolExecutor
from dagster._core.workspace.context import WorkspaceProcessContext, WorkspaceRequestContext
from dagster._core.workspace.load_target import GrpcServerTarget
from dagster._daemon.asset_daemon import (
AssetDaemon,
asset_daemon_cursor_from_instigator_serialized_cursor,
)
from dagster._daemon.backfill import execute_backfill_iteration
from dagster._daemon.daemon import get_default_daemon_logger
from dagster._daemon.sensor import execute_sensor_iteration
from dagster._grpc.server import GrpcServerProcess
from dagster._time import get_current_datetime


def get_loadable_target_origin(filename: str) -> LoadableTargetOrigin:
return LoadableTargetOrigin(
executable_path=sys.executable,
module_name=(
f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}"
),
working_directory=os.getcwd(),
)


def get_code_location_origin(
filename: str, location_name: Optional[str] = None
) -> InProcessCodeLocationOrigin:
return InProcessCodeLocationOrigin(
loadable_target_origin=LoadableTargetOrigin(
executable_path=sys.executable,
module_name=(
f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}"
),
working_directory=os.getcwd(),
),
loadable_target_origin=get_loadable_target_origin(filename),
location_name=location_name or filename,
)

Expand Down Expand Up @@ -97,6 +109,35 @@ def get_workspace_request_context(filenames: Sequence[str]):
yield workspace_context


@contextmanager
def get_grpc_workspace_request_context(filename: str):
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
}
) as instance:
with GrpcServerProcess(
instance_ref=instance.get_ref(),
loadable_target_origin=get_loadable_target_origin(filename),
max_workers=4,
wait_on_exit=False,
) as server_process:
target = GrpcServerTarget(
host="localhost",
socket=server_process.socket,
port=server_process.port,
location_name="test",
)
with create_test_daemon_workspace_context(
workspace_load_target=target, instance=instance
) as workspace_context:
_setup_instance(workspace_context)
yield workspace_context


@contextmanager
def get_threadpool_executor():
with SingleThreadPoolExecutor() as executor:
Expand Down Expand Up @@ -168,17 +209,23 @@ def _get_latest_evaluation_ids(context: WorkspaceProcessContext) -> AbstractSet[
return {cursor.evaluation_id for cursor in _get_current_cursors(context).values()}


def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]:
ids = []
request_context = context.create_request_context()
for sensor in _get_automation_sensors(request_context):
ticks = request_context.instance.get_ticks(
def _get_latest_ticks(context: WorkspaceRequestContext) -> Sequence[InstigatorTick]:
latest_ticks = []
for sensor in _get_automation_sensors(context):
ticks = context.instance.get_ticks(
sensor.get_remote_origin_id(),
sensor.get_remote_origin().get_selector().get_id(),
limit=1,
)
latest_tick = next(iter(ticks), None)
if latest_tick and latest_tick.tick_data:
latest_ticks.extend(ticks)
return latest_ticks


def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]:
ids = []
request_context = context.create_request_context()
for latest_tick in _get_latest_ticks(request_context):
if latest_tick.tick_data:
ids.extend(latest_tick.tick_data.reserved_run_ids or [])
return ids

Expand Down Expand Up @@ -465,8 +512,8 @@ def test_backfill_creation_simple(location: str) -> None:


def test_backfill_with_runs_and_checks() -> None:
with get_workspace_request_context(
["backfill_with_runs_and_checks"]
with get_grpc_workspace_request_context(
"backfill_with_runs_and_checks"
) as context, get_threadpool_executor() as executor:
asset_graph = context.create_request_context().asset_graph

Expand Down Expand Up @@ -577,8 +624,8 @@ def test_toggle_user_code() -> None:


def test_custom_condition() -> None:
with get_workspace_request_context(
["custom_condition"]
with get_grpc_workspace_request_context(
"custom_condition"
) as context, get_threadpool_executor() as executor:
time = datetime.datetime(2024, 8, 16, 1, 35)

Expand All @@ -600,3 +647,30 @@ def test_custom_condition() -> None:
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0


def test_500_eager_assets_user_code(capsys) -> None:
with get_grpc_workspace_request_context(
"500_eager_assets"
) as context, get_threadpool_executor() as executor:
freeze_dt = datetime.datetime(2024, 8, 16, 1, 35)

for _ in range(2):
clock_time = time.time()
with freeze_time(freeze_dt):
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0
duration = time.time() - clock_time
assert duration < 40.0

freeze_dt += datetime.timedelta(minutes=1)

latest_ticks = _get_latest_ticks(context.create_request_context())
assert len(latest_ticks) == 1
# no failure
assert latest_ticks[0].status == TickStatus.SKIPPED

# more specific check
for line in capsys.readouterr():
assert "RESOURCE_EXHAUSTED" not in line
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_eager_perf() -> None:
AssetLayerConfig(200, 2, daily_partitions_def),
AssetLayerConfig(100, 2, daily_partitions_def),
],
auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(),
automation_condition=AutomationCondition.eager(),
)

instance = DagsterInstance.ephemeral()
Expand Down

0 comments on commit f4e2cf8

Please sign in to comment.