Skip to content

Commit

Permalink
[pipes] small improvements to make Pipes more test-friendly (#26470)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Small Pipes edits which makes it easier to test Pipes (needed for Java
pipes tests)

## How I Tested These Changes

Existing tests
  • Loading branch information
danielgafni authored Dec 16, 2024
1 parent 51bdd79 commit b9a5069
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
5 changes: 2 additions & 3 deletions python_modules/dagster/dagster/_core/pipes/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,8 @@ def _resolve_metadata(
k: self._resolve_metadata_value(v["raw_value"], v["type"]) for k, v in metadata.items()
}

def _resolve_metadata_value(
self, value: Any, metadata_type: PipesMetadataType
) -> MetadataValue:
@staticmethod
def _resolve_metadata_value(value: Any, metadata_type: PipesMetadataType) -> MetadataValue:
if metadata_type == PIPES_METADATA_TYPE_INFER:
return normalize_metadata_value(value)
elif metadata_type == "text":
Expand Down
9 changes: 7 additions & 2 deletions python_modules/dagster/dagster/_core/pipes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,18 @@ class PipesFileMessageReader(PipesMessageReader):
Args:
path (str): The path of the file to which messages will be written. The file will be deleted
on close of the pipes session.
include_stdio_in_messages (bool): Whether to include stdout/stderr logs in the messages produced by the message writer in the external process.
cleanup_file (bool): Whether to delete the file on close of the pipes session.
"""

def __init__(self, path: str, include_stdio_in_messages: bool = False):
def __init__(
self, path: str, include_stdio_in_messages: bool = False, cleanup_file: bool = True
):
self._path = check.str_param(path, "path")
self._include_stdio_in_messages = check.bool_param(
include_stdio_in_messages, "include_stdio_in_messages"
)
self._cleanup_file = cleanup_file

def on_launched(self, params: PipesLaunchedData) -> None:
self.launched_payload = params
Expand Down Expand Up @@ -178,7 +183,7 @@ def read_messages(
is_session_closed.set()
if thread:
thread.join()
if os.path.exists(self._path):
if os.path.exists(self._path) and self._cleanup_file:
os.remove(self._path)

def _reader_thread(self, handler: "PipesMessageHandler", is_resource_complete: Event) -> None:
Expand Down

0 comments on commit b9a5069

Please sign in to comment.