Skip to content

Commit

Permalink
Add a way for the step delegating executor to query events in rolling…
Browse files Browse the repository at this point in the history
… windows (#19803)

Summary:
This provides an offset environment variable that you can use to make
the step delegating executor apply an offset before querying events from
the event log. To ensure that events are not queried twice, a set of
seen event IDs is checked before returning results.

Test Plan: BK, adding new test now

## Summary & Motivation

## How I Tested These Changes

(cherry picked from commit a18c155)
  • Loading branch information
gibsondan authored and jmsanders committed Feb 14, 2024
1 parent 10b99d8 commit d745454
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
import sys
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Set, cast

import pendulum

import dagster._check as check
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.event_api import EventLogCursor
from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData
from dagster._core.execution.context.system import PlanOrchestrationContext
from dagster._core.execution.plan.active import ActiveExecution
Expand Down Expand Up @@ -68,24 +69,44 @@ def __init__(
),
)
self._should_verify_step = should_verify_step

self._event_cursor: Optional[str] = None
self._pop_events_offset = int(os.getenv("DAGSTER_EXECUTOR_POP_EVENTS_OFFSET", "0"))

@property
def retries(self):
return self._retries

def _pop_events(self, instance: DagsterInstance, run_id: str) -> Sequence[DagsterEvent]:
def _pop_events(
self, instance: DagsterInstance, run_id: str, seen_storage_ids: Set[int]
) -> Sequence[DagsterEvent]:
adjusted_cursor = self._event_cursor

if self._pop_events_offset > 0 and self._event_cursor:
cursor_obj = EventLogCursor.parse(self._event_cursor)
check.invariant(
cursor_obj.is_id_cursor(),
"Applying a tailer offset only works with an id-based cursor",
)
adjusted_cursor = EventLogCursor.from_storage_id(
cursor_obj.storage_id() - self._pop_events_offset
).to_string()

conn = instance.get_records_for_run(
run_id,
self._event_cursor,
adjusted_cursor,
of_type=set(DagsterEventType),
)
self._event_cursor = conn.cursor

dagster_events = [
record.event_log_entry.dagster_event
for record in conn.records
if record.event_log_entry.dagster_event
if record.event_log_entry.dagster_event and record.storage_id not in seen_storage_ids
]

seen_storage_ids.update(record.storage_id for record in conn.records)

return dagster_events

def _get_step_handler_context(
Expand All @@ -111,6 +132,7 @@ def _get_step_handler_context(
def execute(self, plan_context: PlanOrchestrationContext, execution_plan: ExecutionPlan):
check.inst_param(plan_context, "plan_context", PlanOrchestrationContext)
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
seen_storage_ids = set()

DagsterEvent.engine_event(
plan_context,
Expand Down Expand Up @@ -139,6 +161,7 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
prior_events = self._pop_events(
plan_context.instance,
plan_context.run_id,
seen_storage_ids,
)
for dagster_event in prior_events:
yield dagster_event
Expand Down Expand Up @@ -237,6 +260,7 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
for dagster_event in self._pop_events(
plan_context.instance,
plan_context.run_id,
seen_storage_ids,
):
yield dagster_event
# STEP_SKIPPED events are only emitted by ActiveExecution, which already handles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
StepHandler,
)
from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG
from dagster._core.test_utils import instance_for_test
from dagster._core.test_utils import environ, instance_for_test
from dagster._utils.merger import merge_dicts

from .retry_jobs import (
Expand Down Expand Up @@ -143,6 +143,29 @@ def test_execute():
assert TestStepHandler.verify_step_count == 0


def test_execute_with_tailer_offset():
TestStepHandler.reset()
with instance_for_test() as instance:
with environ({"DAGSTER_EXECUTOR_POP_EVENTS_OFFSET": "100000"}):
result = execute_job(
reconstructable(foo_job),
instance=instance,
run_config={"execution": {"config": {}}},
)
TestStepHandler.wait_for_processes()

assert any(
[
"Starting execution with step handler TestStepHandler" in event.message
for event in result.all_events
]
)
assert any(["STEP_START" in event for event in result.all_events])
assert result.success
assert TestStepHandler.saw_baz_op
assert TestStepHandler.verify_step_count == 0


def test_skip_execute():
from .test_jobs import define_dynamic_skipping_job

Expand Down

0 comments on commit d745454

Please sign in to comment.