Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test demonstrating that we do not hit the RESOURCE_EXHAUSTED error above a given threshold #25214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

from dagster import (
AssetsDefinition,
AutoMaterializePolicy,
AutomationCondition,
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
PartitionsDefinition,
StaticPartitionsDefinition,
asset,
repository,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy


class AssetLayerConfig(NamedTuple):
Expand All @@ -22,7 +23,7 @@ class AssetLayerConfig(NamedTuple):
def build_assets(
id: str,
layer_configs: Sequence[AssetLayerConfig],
auto_materialize_policy: AutoMaterializePolicy = AutoMaterializePolicy.eager(),
automation_condition: AutomationCondition,
) -> List[AssetsDefinition]:
layers = []

Expand All @@ -44,7 +45,7 @@ def build_assets(
@asset(
partitions_def=layer_config.partitions_def,
name=f"{id}_{len(layers)}_{i}",
auto_materialize_policy=auto_materialize_policy,
automation_condition=automation_condition,
non_argument_deps=non_argument_deps,
)
def _asset():
Expand Down Expand Up @@ -74,6 +75,7 @@ def auto_materialize_large_time_graph():
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily),
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily),
],
automation_condition=AutoMaterializePolicy.eager().to_automation_condition(),
)


Expand All @@ -88,4 +90,5 @@ def auto_materialize_large_static_graph():
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=static),
AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=None),
],
automation_condition=AutoMaterializePolicy.eager().to_automation_condition(),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from dagster import (
AutomationCondition,
AutomationConditionSensorDefinition,
DailyPartitionsDefinition,
Definitions,
HourlyPartitionsDefinition,
)
from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets


def get_defs(n: int) -> Definitions:
hourly_partitions_def = HourlyPartitionsDefinition("2020-01-01-00:00")
daily_partitions_def = DailyPartitionsDefinition("2020-01-01")
unit = n // 10
assets = build_assets(
id="perf_test",
layer_configs=[
AssetLayerConfig(1 * unit, 0, hourly_partitions_def),
AssetLayerConfig(2 * unit, 2, hourly_partitions_def),
AssetLayerConfig(2 * unit, 4, hourly_partitions_def),
AssetLayerConfig(2 * unit, 4, daily_partitions_def),
AssetLayerConfig(2 * unit, 2, daily_partitions_def),
AssetLayerConfig(1 * unit, 2, daily_partitions_def),
],
automation_condition=AutomationCondition.eager(),
)
return Definitions(
assets=assets,
sensors=[
AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True)
],
)


defs = get_defs(500)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import os
import sys
import time
from contextlib import contextmanager
from typing import AbstractSet, Mapping, Optional, Sequence, cast

Expand All @@ -15,7 +16,12 @@
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.remote_representation.external import RemoteSensor
from dagster._core.remote_representation.origin import InProcessCodeLocationOrigin
from dagster._core.scheduler.instigation import InstigatorState, SensorInstigatorData
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorTick,
SensorInstigatorData,
TickStatus,
)
from dagster._core.storage.dagster_run import DagsterRun
from dagster._core.test_utils import (
InProcessTestWorkspaceLoadTarget,
Expand All @@ -27,27 +33,33 @@
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import InheritContextThreadPoolExecutor
from dagster._core.workspace.context import WorkspaceProcessContext, WorkspaceRequestContext
from dagster._core.workspace.load_target import GrpcServerTarget
from dagster._daemon.asset_daemon import (
AssetDaemon,
asset_daemon_cursor_from_instigator_serialized_cursor,
)
from dagster._daemon.backfill import execute_backfill_iteration
from dagster._daemon.daemon import get_default_daemon_logger
from dagster._daemon.sensor import execute_sensor_iteration
from dagster._grpc.server import GrpcServerProcess
from dagster._time import get_current_datetime


def get_loadable_target_origin(filename: str) -> LoadableTargetOrigin:
return LoadableTargetOrigin(
executable_path=sys.executable,
module_name=(
f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}"
),
working_directory=os.getcwd(),
)


def get_code_location_origin(
filename: str, location_name: Optional[str] = None
) -> InProcessCodeLocationOrigin:
return InProcessCodeLocationOrigin(
loadable_target_origin=LoadableTargetOrigin(
executable_path=sys.executable,
module_name=(
f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}"
),
working_directory=os.getcwd(),
),
loadable_target_origin=get_loadable_target_origin(filename),
location_name=location_name or filename,
)

Expand Down Expand Up @@ -97,6 +109,35 @@ def get_workspace_request_context(filenames: Sequence[str]):
yield workspace_context


@contextmanager
def get_grpc_workspace_request_context(filename: str):
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
}
) as instance:
with GrpcServerProcess(
instance_ref=instance.get_ref(),
loadable_target_origin=get_loadable_target_origin(filename),
max_workers=4,
wait_on_exit=False,
) as server_process:
target = GrpcServerTarget(
host="localhost",
socket=server_process.socket,
port=server_process.port,
location_name="test",
)
with create_test_daemon_workspace_context(
workspace_load_target=target, instance=instance
) as workspace_context:
_setup_instance(workspace_context)
yield workspace_context


@contextmanager
def get_threadpool_executor():
with SingleThreadPoolExecutor() as executor:
Expand Down Expand Up @@ -168,17 +209,23 @@ def _get_latest_evaluation_ids(context: WorkspaceProcessContext) -> AbstractSet[
return {cursor.evaluation_id for cursor in _get_current_cursors(context).values()}


def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]:
ids = []
request_context = context.create_request_context()
for sensor in _get_automation_sensors(request_context):
ticks = request_context.instance.get_ticks(
def _get_latest_ticks(context: WorkspaceRequestContext) -> Sequence[InstigatorTick]:
latest_ticks = []
for sensor in _get_automation_sensors(context):
ticks = context.instance.get_ticks(
sensor.get_remote_origin_id(),
sensor.get_remote_origin().get_selector().get_id(),
limit=1,
)
latest_tick = next(iter(ticks), None)
if latest_tick and latest_tick.tick_data:
latest_ticks.extend(ticks)
return latest_ticks


def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]:
ids = []
request_context = context.create_request_context()
for latest_tick in _get_latest_ticks(request_context):
if latest_tick.tick_data:
ids.extend(latest_tick.tick_data.reserved_run_ids or [])
return ids

Expand Down Expand Up @@ -465,8 +512,8 @@ def test_backfill_creation_simple(location: str) -> None:


def test_backfill_with_runs_and_checks() -> None:
with get_workspace_request_context(
["backfill_with_runs_and_checks"]
with get_grpc_workspace_request_context(
"backfill_with_runs_and_checks"
) as context, get_threadpool_executor() as executor:
asset_graph = context.create_request_context().asset_graph

Expand Down Expand Up @@ -577,8 +624,8 @@ def test_toggle_user_code() -> None:


def test_custom_condition() -> None:
with get_workspace_request_context(
["custom_condition"]
with get_grpc_workspace_request_context(
"custom_condition"
) as context, get_threadpool_executor() as executor:
time = datetime.datetime(2024, 8, 16, 1, 35)

Expand All @@ -600,3 +647,30 @@ def test_custom_condition() -> None:
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0


def test_500_eager_assets_user_code(capsys) -> None:
with get_grpc_workspace_request_context(
"500_eager_assets"
) as context, get_threadpool_executor() as executor:
freeze_dt = datetime.datetime(2024, 8, 16, 1, 35)

for _ in range(2):
clock_time = time.time()
with freeze_time(freeze_dt):
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0
duration = time.time() - clock_time
assert duration < 40.0

freeze_dt += datetime.timedelta(minutes=1)

latest_ticks = _get_latest_ticks(context.create_request_context())
assert len(latest_ticks) == 1
# no failure
assert latest_ticks[0].status == TickStatus.SKIPPED

# more specific check
for line in capsys.readouterr():
OwenKephart marked this conversation as resolved.
Show resolved Hide resolved
assert "RESOURCE_EXHAUSTED" not in line
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_eager_perf() -> None:
AssetLayerConfig(200, 2, daily_partitions_def),
AssetLayerConfig(100, 2, daily_partitions_def),
],
auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(),
automation_condition=AutomationCondition.eager(),
)

instance = DagsterInstance.ephemeral()
Expand Down