Skip to content

Commit

Permalink
[backfills] Add batch inserts for ranged materialization/observation …
Browse files Browse the repository at this point in the history
…backfills (dagster-io#19862)

## Summary & Motivation

Internal companion PR: dagster-io/internal#8634

See this comment for up-to-date summary:
dagster-io#19862 (comment)

Add batch insert support for asset materializations and observations
emitted in a single step. Previously, a ranged backfill that generated N
`AssetMaterialization` events would perform N DB inserts. After this PR,
it generates only a few inserts (more than 1 because a few tables are
hit).

## How it works

- Logged dagster events take the following pathway:
- `DagsterEvent.<event_type>` (e.g.
`DagsterEvent.asset_materialization`)
    - `DagsterEvent.from_step`
    - `log_dagster_event`
    - `DagsterLogManager.log`
    - `DagsterLogManager._log`
    - `DagsterLogHandler.filter`
    - `DagsterLogHandler.emit`
    - `_EventListenerLogHandler` (in `_core/instance/__init__.py`)
    - `DagsterInstance.handle_new_event`
- This pathway has been modified to pass batch metadata, represented in
a new `DagsterEventBatchMetadata` class, along the entire path.
`DagsterEventBatchMetadata` has two fields: `id` and `is_end`.
- `DagsterEventBatchMetadata` must be passed in to the static method
that creates the event (`DagsterEvent.<event_type>`). It is then
packaged side by side with the event and passed up the call chain.
`DagsterEvent` itself is not modified.
- Batching management is done in `DagsterInstance.handle_new_event`,
which receives optional `DagsterEventBatchMetadata`. If a call receives
batch metadata, the event is buffered under the batch id. The buffer is
cleared and a write is performed if (a) the buffer has hit the
`EVENT_BATCH_SIZE` threshold (set at 1000, overridable via env var); or
(b) `DagsterEventBatchMetadata.is_end` is set.
- The write is implemented via a new method
`EventLogStorage.store_event_batch`. The default implementation of this
is just to loop over the events in the batch and call `store_event`,
i.e. equivalent to the old behavior. In the `EventLogStorage`
implementations in cloud and `dagster-postgres`, this method is
overridden to perform a batch insert instead.

## How I Tested These Changes

- Add new storage test that performs a single-run partition range
materialization. The organization of the storage tests is unfamiliar to
me so I'm not sure I put it in the appropriate place.
- Add simple single run backfill test that actually executes the full
backfill.
  • Loading branch information
smackesey authored Mar 25, 2024
1 parent c1d752f commit 52c7830
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 104 deletions.
33 changes: 30 additions & 3 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import sys
import uuid
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -243,6 +244,12 @@ class DagsterEventType(str, Enum):

PIPELINE_RUN_STATUS_TO_EVENT_TYPE = {v: k for k, v in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.items()}

# These are the only events currently supported in `EventLogStorage.store_event_batch`
BATCH_WRITABLE_EVENTS = {
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.ASSET_OBSERVATION,
}

ASSET_EVENTS = {
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.ASSET_OBSERVATION,
Expand Down Expand Up @@ -327,14 +334,23 @@ def _validate_event_specific_data(
return event_specific_data


def log_step_event(step_context: IStepContext, event: "DagsterEvent") -> None:
def generate_event_batch_id():
return str(uuid.uuid4())


def log_step_event(
step_context: IStepContext,
event: "DagsterEvent",
batch_metadata: Optional["DagsterEventBatchMetadata"],
) -> None:
event_type = DagsterEventType(event.event_type_value)
log_level = logging.ERROR if event_type in FAILURE_EVENTS else logging.DEBUG

step_context.log.log_dagster_event(
level=log_level,
msg=event.message or f"{event_type} for step {step_context.step.key}",
dagster_event=event,
batch_metadata=batch_metadata,
)


Expand Down Expand Up @@ -393,6 +409,11 @@ def handle_unpack_error(
)


class DagsterEventBatchMetadata(NamedTuple):
id: str
is_end: bool


@whitelist_for_serdes(
serializer=DagsterEventSerializer,
storage_field_names={
Expand Down Expand Up @@ -439,6 +460,7 @@ def from_step(
step_context: IStepContext,
event_specific_data: Optional["EventSpecificData"] = None,
message: Optional[str] = None,
batch_metadata: Optional["DagsterEventBatchMetadata"] = None,
) -> "DagsterEvent":
event = DagsterEvent(
event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value,
Expand All @@ -452,7 +474,7 @@ def from_step(
pid=os.getpid(),
)

log_step_event(step_context, event)
log_step_event(step_context, event, batch_metadata)

return event

Expand Down Expand Up @@ -982,6 +1004,7 @@ def step_skipped_event(step_context: IStepContext) -> "DagsterEvent":
def asset_materialization(
step_context: IStepContext,
materialization: AssetMaterialization,
batch_metadata: Optional[DagsterEventBatchMetadata] = None,
) -> "DagsterEvent":
return DagsterEvent.from_step(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
Expand All @@ -994,16 +1017,20 @@ def asset_materialization(
label_clause=f" {materialization.label}" if materialization.label else ""
)
),
batch_metadata=batch_metadata,
)

@staticmethod
def asset_observation(
step_context: IStepContext, observation: AssetObservation
step_context: IStepContext,
observation: AssetObservation,
batch_metadata: Optional[DagsterEventBatchMetadata] = None,
) -> "DagsterEvent":
return DagsterEvent.from_step(
event_type=DagsterEventType.ASSET_OBSERVATION,
step_context=step_context,
event_specific_data=AssetObservationData(observation),
batch_metadata=batch_metadata,
)

@staticmethod
Expand Down
40 changes: 24 additions & 16 deletions python_modules/dagster/dagster/_core/execution/plan/execute_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
DagsterTypeCheckError,
user_code_error_boundary,
)
from dagster._core.events import DagsterEvent
from dagster._core.events import DagsterEvent, DagsterEventBatchMetadata, generate_event_batch_id
from dagster._core.execution.context.compute import enter_execution_context
from dagster._core.execution.context.output import OutputContext
from dagster._core.execution.context.system import StepExecutionContext, TypeCheckContext
Expand Down Expand Up @@ -666,6 +666,7 @@ def _get_output_asset_events(

if execution_type == AssetExecutionType.MATERIALIZATION:
event_class = AssetMaterialization
event_class = AssetMaterialization
elif execution_type == AssetExecutionType.OBSERVATION:
event_class = AssetObservation
else:
Expand Down Expand Up @@ -922,26 +923,33 @@ def _log_materialization_or_observation_events_for_asset(
f"Unexpected asset execution type {execution_type}",
)

yield from (
(
_dagster_event_for_asset_event(step_context, event)
for event in _get_output_asset_events(
asset_key,
partitions,
output,
output_def,
manager_metadata,
step_context,
execution_type,
)
asset_events = list(
_get_output_asset_events(
asset_key,
partitions,
output,
output_def,
manager_metadata,
step_context,
execution_type,
)
)

batch_id = generate_event_batch_id()
last_index = len(asset_events) - 1
for i, asset_event in enumerate(asset_events):
batch_metadata = (
DagsterEventBatchMetadata(batch_id, i == last_index) if partitions else None
)
yield _dagster_event_for_asset_event(step_context, asset_event, batch_metadata)


def _dagster_event_for_asset_event(
step_context: StepExecutionContext, asset_event: Union[AssetMaterialization, AssetObservation]
step_context: StepExecutionContext,
asset_event: Union[AssetMaterialization, AssetObservation],
batch_metadata: Optional[DagsterEventBatchMetadata],
):
if isinstance(asset_event, AssetMaterialization):
return DagsterEvent.asset_materialization(step_context, asset_event)
return DagsterEvent.asset_materialization(step_context, asset_event, batch_metadata)
else: # observation
return DagsterEvent.asset_observation(step_context, asset_event)
return DagsterEvent.asset_observation(step_context, asset_event, batch_metadata)
83 changes: 74 additions & 9 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Any,
Callable,
Dict,
Final,
Generic,
Iterable,
List,
Expand Down Expand Up @@ -103,6 +104,13 @@
RUNLESS_RUN_ID = ""
RUNLESS_JOB_NAME = ""

# Sets the number of events that will be buffered before being written to the event log. Only
# applies to explicitly batched events. Currently this defaults to 0, which turns off batching
# entirely (multiple store_event calls are made instead of store_event_batch). This makes batching
# opt-in.
EVENT_BATCH_SIZE: Final = int(os.getenv("DAGSTER_EVENT_BATCH_SIZE", "0"))


if TYPE_CHECKING:
from dagster._core.debug import DebugRunPayload
from dagster._core.definitions.asset_check_spec import AssetCheckKey
Expand All @@ -122,6 +130,7 @@
from dagster._core.events import (
AssetMaterialization,
DagsterEvent,
DagsterEventBatchMetadata,
DagsterEventType,
EngineEventData,
)
Expand Down Expand Up @@ -182,6 +191,10 @@
DagsterInstanceOverrides: TypeAlias = Mapping[str, Any]


def _is_batch_writing_enabled() -> bool:
return EVENT_BATCH_SIZE > 0


def _check_run_equality(
pipeline_run: DagsterRun, candidate_run: DagsterRun
) -> Mapping[str, Tuple[Any, Any]]:
Expand Down Expand Up @@ -224,18 +237,21 @@ def emit(self, record: logging.LogRecord) -> None:
from dagster._core.events import EngineEventData
from dagster._core.events.log import StructuredLoggerMessage, construct_event_record

record_metadata = get_log_record_metadata(record)
event = construct_event_record(
StructuredLoggerMessage(
name=record.name,
message=record.msg,
level=record.levelno,
meta=get_log_record_metadata(record),
meta=record_metadata,
record=record,
)
)

try:
self._instance.handle_new_event(event)
self._instance.handle_new_event(
event, batch_metadata=record_metadata["dagster_event_batch_metadata"]
)
except Exception as e:
sys.stderr.write(f"Exception while writing logger call to event log: {e}\n")
if event.dagster_event:
Expand Down Expand Up @@ -479,6 +495,9 @@ def __init__(
" them. Consider switching to Postgres or Mysql.",
)

# Used for batched event handling
self._event_buffer: Dict[str, List[EventLogEntry]] = defaultdict(list)

# ctors

@public
Expand Down Expand Up @@ -2355,16 +2374,62 @@ def get_handlers(self) -> Sequence[logging.Handler]:
def store_event(self, event: "EventLogEntry") -> None:
self._event_storage.store_event(event)

def handle_new_event(self, event: "EventLogEntry") -> None:
run_id = event.run_id
def handle_new_event(
self,
event: "EventLogEntry",
*,
batch_metadata: Optional["DagsterEventBatchMetadata"] = None,
) -> None:
"""Handle a new event by storing it and notifying subscribers.
Events may optionally be sent with `batch_metadata`. If batch writing is enabled, then
events sent with `batch_metadata` will not trigger an immediate write. Instead, they will be
kept in a batch-specific buffer (identified by `batch_metadata.id`) until either the buffer
reaches the EVENT_BATCH_SIZE or the end of the batch is reached (signaled by
`batch_metadata.is_end`). When this point is reached, all events in the buffer will be sent
to the storage layer in a single batch. If an error occurrs during batch writing, then we
fall back to iterative individual event writes.
self._event_storage.store_event(event)
Args:
event (EventLogEntry): The event to handle.
batch_metadata (Optional[DagsterEventBatchMetadata]): Metadata for batch writing.
"""
if batch_metadata is None or not _is_batch_writing_enabled():
events = [event]
else:
batch_id, is_batch_end = batch_metadata.id, batch_metadata.is_end
self._event_buffer[batch_id].append(event)
if is_batch_end or len(self._event_buffer[batch_id]) == EVENT_BATCH_SIZE:
events = self._event_buffer[batch_id]
del self._event_buffer[batch_id]
else:
return

if len(events) == 1:
self._event_storage.store_event(events[0])
else:
try:
self._event_storage.store_event_batch(events)

# Fall back to storing events one by one if writing a batch fails. We catch a generic
# Exception because that is the parent class of the actually received error,
# dagster_cloud_cli.core.errors.GraphQLStorageError, which we cannot import here due to
# it living in a cloud package.
except Exception as e:
sys.stderr.write(f"Exception while storing event batch: {e}\n")
sys.stderr.write(
"Falling back to storing multiple single-event storage requests...\n"
)
for event in events:
self._event_storage.store_event(event)

if event.is_dagster_event and event.get_dagster_event().is_job_event:
self._run_storage.handle_run_event(run_id, event.get_dagster_event())
for event in events:
run_id = event.run_id
if event.is_dagster_event and event.get_dagster_event().is_job_event:
self._run_storage.handle_run_event(run_id, event.get_dagster_event())

for sub in self._subscribers[run_id]:
sub(event)
for sub in self._subscribers[run_id]:
sub(event)

def add_event_listener(self, run_id: str, cb) -> None:
self._subscribers[run_id].append(cb)
Expand Down
Loading

0 comments on commit 52c7830

Please sign in to comment.