diff --git a/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py b/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py index 7357ff9e1033c..c71415a7d3427 100644 --- a/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py +++ b/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py @@ -1,12 +1,13 @@ import os import sys import time -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Set, cast import pendulum import dagster._check as check from dagster._core.definitions.metadata import MetadataValue +from dagster._core.event_api import EventLogCursor from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData from dagster._core.execution.context.system import PlanOrchestrationContext from dagster._core.execution.plan.active import ActiveExecution @@ -68,24 +69,44 @@ def __init__( ), ) self._should_verify_step = should_verify_step + self._event_cursor: Optional[str] = None + self._pop_events_offset = int(os.getenv("DAGSTER_EXECUTOR_POP_EVENTS_OFFSET", "0")) @property def retries(self): return self._retries - def _pop_events(self, instance: DagsterInstance, run_id: str) -> Sequence[DagsterEvent]: + def _pop_events( + self, instance: DagsterInstance, run_id: str, seen_storage_ids: Set[int] + ) -> Sequence[DagsterEvent]: + adjusted_cursor = self._event_cursor + + if self._pop_events_offset > 0 and self._event_cursor: + cursor_obj = EventLogCursor.parse(self._event_cursor) + check.invariant( + cursor_obj.is_id_cursor(), + "Applying a tailer offset only works with an id-based cursor", + ) + adjusted_cursor = EventLogCursor.from_storage_id( + cursor_obj.storage_id() - self._pop_events_offset + ).to_string() + conn = instance.get_records_for_run( run_id, - self._event_cursor, + adjusted_cursor, of_type=set(DagsterEventType), ) self._event_cursor = conn.cursor + dagster_events = [ record.event_log_entry.dagster_event for record in conn.records - if record.event_log_entry.dagster_event + if record.event_log_entry.dagster_event and record.storage_id not in seen_storage_ids ] + + seen_storage_ids.update(record.storage_id for record in conn.records) + return dagster_events def _get_step_handler_context( @@ -111,6 +132,7 @@ def _get_step_handler_context( def execute(self, plan_context: PlanOrchestrationContext, execution_plan: ExecutionPlan): check.inst_param(plan_context, "plan_context", PlanOrchestrationContext) check.inst_param(execution_plan, "execution_plan", ExecutionPlan) + seen_storage_ids = set() DagsterEvent.engine_event( plan_context, @@ -139,6 +161,7 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut prior_events = self._pop_events( plan_context.instance, plan_context.run_id, + seen_storage_ids, ) for dagster_event in prior_events: yield dagster_event @@ -237,6 +260,7 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut for dagster_event in self._pop_events( plan_context.instance, plan_context.run_id, + seen_storage_ids, ): yield dagster_event # STEP_SKIPPED events are only emitted by ActiveExecution, which already handles diff --git a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py index 1e9634923ac71..2d0e989fc5827 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py +++ b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py @@ -33,7 +33,7 @@ StepHandler, ) from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG -from dagster._core.test_utils import instance_for_test +from dagster._core.test_utils import environ, instance_for_test from dagster._utils.merger import merge_dicts from .retry_jobs import ( @@ -143,6 +143,29 @@ def test_execute(): assert TestStepHandler.verify_step_count == 0 +def test_execute_with_tailer_offset(): + TestStepHandler.reset() + with instance_for_test() as instance: + with environ({"DAGSTER_EXECUTOR_POP_EVENTS_OFFSET": "100000"}): + result = execute_job( + reconstructable(foo_job), + instance=instance, + run_config={"execution": {"config": {}}}, + ) + TestStepHandler.wait_for_processes() + + assert any( + [ + "Starting execution with step handler TestStepHandler" in event.message + for event in result.all_events + ] + ) + assert any(["STEP_START" in event for event in result.all_events]) + assert result.success + assert TestStepHandler.saw_baz_op + assert TestStepHandler.verify_step_count == 0 + + def test_skip_execute(): from .test_jobs import define_dynamic_skipping_job