Skip to content

Commit

Permalink
[ext] add protocol version to all messages (#16633)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Add `dagster_ext_version` field to all ext messages.

## How I Tested These Changes

New tests.
  • Loading branch information
smackesey authored Sep 22, 2023
1 parent f7b3fee commit 6aa2f40
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
11 changes: 10 additions & 1 deletion python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
# ##### PROTOCOL
# ########################

# This represents the version of the protocol, rather than the version of the package. It must be
# manually updated whenever there are changes to the protocol.
EXT_PROTOCOL_VERSION = "0.1"

ExtExtras = Mapping[str, Any]
ExtParams = Mapping[str, Any]
Expand All @@ -61,8 +64,12 @@ def _param_name_to_env_key(key: str) -> str:

# ##### MESSAGE

# Can't use a constant for TypedDict key so this value is repeated in `ExtMessage` defn.
EXT_PROTOCOL_VERSION_FIELD = "__dagster_ext_version"


class ExtMessage(TypedDict):
__dagster_ext_version: str
method: str
params: Optional[Mapping[str, Any]]

Expand Down Expand Up @@ -670,7 +677,9 @@ def __init__(
self._materialized_assets: set[str] = set()

def _write_message(self, method: str, params: Optional[Mapping[str, Any]] = None) -> None:
message = ExtMessage(method=method, params=params)
message = ExtMessage(
{EXT_PROTOCOL_VERSION_FIELD: EXT_PROTOCOL_VERSION, "method": method, "params": params}
)
self._message_channel.write_message(message)

# ########################
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/ext/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Iterator, Optional

from dagster_ext import (
EXT_PROTOCOL_VERSION_FIELD,
ExtContextData,
ExtDefaultContextLoader,
ExtDefaultMessageWriter,
Expand Down Expand Up @@ -177,8 +178,7 @@ def extract_message_or_forward_to_stdout(handler: "ExtMessageHandler", log_line:
# exceptions as control flow, you love to see it
try:
message = json.loads(log_line)
# need better message check
if message.keys() == {"method", "params"}:
if EXT_PROTOCOL_VERSION_FIELD in message.keys():
handler.handle_message(message)
except Exception:
# move non-message logs in to stdout for compute log capture
Expand Down

0 comments on commit 6aa2f40

Please sign in to comment.