Skip to content

Commit

Permalink
Add test demonstrating that we do not hit the RESOURCE_EXHAUSTED erro…
Browse files Browse the repository at this point in the history
…r above a given threshold
  • Loading branch information
OwenKephart committed Oct 15, 2024
1 parent c02046f commit bb14be6
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

from dagster import (
AssetsDefinition,
AutoMaterializePolicy,
AutomationCondition,
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
PartitionsDefinition,
StaticPartitionsDefinition,
asset,
repository,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy


class AssetLayerConfig(NamedTuple):
Expand All @@ -22,7 +23,7 @@ class AssetLayerConfig(NamedTuple):
def build_assets(
id: str,
layer_configs: Sequence[AssetLayerConfig],
auto_materialize_policy: AutoMaterializePolicy = AutoMaterializePolicy.eager(),
automation_condition: AutomationCondition,
) -> List[AssetsDefinition]:
layers = []

Expand All @@ -44,7 +45,7 @@ def build_assets(
@asset(
partitions_def=layer_config.partitions_def,
name=f"{id}_{len(layers)}_{i}",
auto_materialize_policy=auto_materialize_policy,
automation_condition=automation_condition,
non_argument_deps=non_argument_deps,
)
def _asset():
Expand Down Expand Up @@ -74,6 +75,7 @@ def auto_materialize_large_time_graph():
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily),
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily),
],
automation_condition=AutoMaterializePolicy.eager().to_automation_condition(),
)


Expand All @@ -88,4 +90,5 @@ def auto_materialize_large_static_graph():
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=static),
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=None),
],
automation_condition=AutoMaterializePolicy.eager().to_automation_condition(),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from dagster import (
AutomationCondition,
AutomationConditionSensorDefinition,
DailyPartitionsDefinition,
Definitions,
HourlyPartitionsDefinition,
)
from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets


def get_defs(n: int) -> Definitions:
hourly_partitions_def = HourlyPartitionsDefinition("2020-01-01-00:00")
daily_partitions_def = DailyPartitionsDefinition("2020-01-01")
unit = n // 10
assets = build_assets(
id="perf_test",
layer_configs=[
AssetLayerConfig(1 * unit, 0, hourly_partitions_def),
AssetLayerConfig(2 * unit, 2, hourly_partitions_def),
AssetLayerConfig(2 * unit, 4, hourly_partitions_def),
AssetLayerConfig(2 * unit, 4, daily_partitions_def),
AssetLayerConfig(2 * unit, 2, daily_partitions_def),
AssetLayerConfig(1 * unit, 2, daily_partitions_def),
],
automation_condition=AutomationCondition.eager(),
)
return Definitions(
assets=assets,
sensors=[
AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
],
)


defs = get_defs(500)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import os
import sys
import time
from contextlib import contextmanager
from typing import AbstractSet, Mapping, Sequence, cast

Expand All @@ -15,7 +16,12 @@
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.remote_representation.external import RemoteSensor
from dagster._core.remote_representation.origin import InProcessCodeLocationOrigin
from dagster._core.scheduler.instigation import InstigatorState, SensorInstigatorData
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorTick,
SensorInstigatorData,
TickStatus,
)
from dagster._core.storage.dagster_run import DagsterRun
from dagster._core.test_utils import (
InProcessTestWorkspaceLoadTarget,
Expand All @@ -27,25 +33,31 @@
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import InheritContextThreadPoolExecutor
from dagster._core.workspace.context import WorkspaceProcessContext, WorkspaceRequestContext
from dagster._core.workspace.load_target import GrpcServerTarget
from dagster._daemon.asset_daemon import (
AssetDaemon,
asset_daemon_cursor_from_instigator_serialized_cursor,
)
from dagster._daemon.backfill import execute_backfill_iteration
from dagster._daemon.daemon import get_default_daemon_logger
from dagster._daemon.sensor import execute_sensor_iteration
from dagster._grpc.server import GrpcServerProcess
from dagster._time import get_current_datetime


def get_loadable_target_origin(filename: str) -> LoadableTargetOrigin:
return LoadableTargetOrigin(
executable_path=sys.executable,
module_name=(
f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}"
),
working_directory=os.getcwd(),
)


def get_code_location_origin(filename: str) -> InProcessCodeLocationOrigin:
return InProcessCodeLocationOrigin(
loadable_target_origin=LoadableTargetOrigin(
executable_path=sys.executable,
module_name=(
f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}"
),
working_directory=os.getcwd(),
),
loadable_target_origin=get_loadable_target_origin(filename),
location_name=filename,
)

Expand Down Expand Up @@ -95,6 +107,35 @@ def get_workspace_request_context(filenames: Sequence[str]):
yield workspace_context


@contextmanager
def get_grpc_workspace_request_context(filename: str):
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
}
) as instance:
with GrpcServerProcess(
instance_ref=instance.get_ref(),
loadable_target_origin=get_loadable_target_origin(filename),
max_workers=4,
wait_on_exit=False,
) as server_process:
target = GrpcServerTarget(
host="localhost",
socket=server_process.socket,
port=server_process.port,
location_name="test",
)
with create_test_daemon_workspace_context(
workspace_load_target=target, instance=instance
) as workspace_context:
_setup_instance(workspace_context)
yield workspace_context


@contextmanager
def get_threadpool_executor():
with SingleThreadPoolExecutor() as executor:
Expand Down Expand Up @@ -166,17 +207,23 @@ def _get_latest_evaluation_ids(context: WorkspaceProcessContext) -> AbstractSet[
return {cursor.evaluation_id for cursor in _get_current_cursors(context).values()}


def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]:
ids = []
request_context = context.create_request_context()
for sensor in _get_automation_sensors(request_context):
ticks = request_context.instance.get_ticks(
def _get_latest_ticks(context: WorkspaceRequestContext) -> Sequence[InstigatorTick]:
latest_ticks = []
for sensor in _get_automation_sensors(context):
ticks = context.instance.get_ticks(
sensor.get_remote_origin_id(),
sensor.get_remote_origin().get_selector().get_id(),
limit=1,
)
latest_tick = next(iter(ticks), None)
if latest_tick and latest_tick.tick_data:
latest_ticks.extend(ticks)
return latest_ticks


def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]:
ids = []
request_context = context.create_request_context()
for latest_tick in _get_latest_ticks(request_context):
if latest_tick.tick_data:
ids.extend(latest_tick.tick_data.reserved_run_ids or [])
return ids

Expand Down Expand Up @@ -463,8 +510,8 @@ def test_backfill_creation_simple(location: str) -> None:


def test_backfill_with_runs_and_checks() -> None:
with get_workspace_request_context(
["backfill_with_runs_and_checks"]
with get_grpc_workspace_request_context(
"backfill_with_runs_and_checks"
) as context, get_threadpool_executor() as executor:
asset_graph = context.create_request_context().asset_graph

Expand Down Expand Up @@ -524,8 +571,8 @@ def test_backfill_with_runs_and_checks() -> None:


def test_custom_condition() -> None:
with get_workspace_request_context(
["custom_condition"]
with get_grpc_workspace_request_context(
"custom_condition"
) as context, get_threadpool_executor() as executor:
time = datetime.datetime(2024, 8, 16, 1, 35)

Expand All @@ -547,3 +594,30 @@ def test_custom_condition() -> None:
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0


def test_500_eager_assets_user_code(capsys) -> None:
with get_grpc_workspace_request_context(
"500_eager_assets"
) as context, get_threadpool_executor() as executor:
freeze_dt = datetime.datetime(2024, 8, 16, 1, 35)

for _ in range(2):
clock_time = time.time()
with freeze_time(freeze_dt):
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0
duration = time.time() - clock_time
assert duration < 40.0

freeze_dt += datetime.timedelta(minutes=1)

latest_ticks = _get_latest_ticks(context.create_request_context())
assert len(latest_ticks) == 1
# no failure
assert latest_ticks[0].status == TickStatus.SKIPPED

# more specific check
for line in capsys.readouterr():
assert "RESOURCE_EXHAUSTED" not in line
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_eager_perf() -> None:
AssetLayerConfig(200, 2, daily_partitions_def),
AssetLayerConfig(100, 2, daily_partitions_def),
],
auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(),
automation_condition=AutomationCondition.eager(),
)

instance = DagsterInstance.ephemeral()
Expand Down

0 comments on commit bb14be6

Please sign in to comment.