diff --git a/python_modules/dagster/dagster/_core/pipes/context.py b/python_modules/dagster/dagster/_core/pipes/context.py index a0533ce3379f9..b5d82e9454927 100644 --- a/python_modules/dagster/dagster/_core/pipes/context.py +++ b/python_modules/dagster/dagster/_core/pipes/context.py @@ -132,9 +132,8 @@ def _resolve_metadata( k: self._resolve_metadata_value(v["raw_value"], v["type"]) for k, v in metadata.items() } - def _resolve_metadata_value( - self, value: Any, metadata_type: PipesMetadataType - ) -> MetadataValue: + @staticmethod + def _resolve_metadata_value(value: Any, metadata_type: PipesMetadataType) -> MetadataValue: if metadata_type == PIPES_METADATA_TYPE_INFER: return normalize_metadata_value(value) elif metadata_type == "text": diff --git a/python_modules/dagster/dagster/_core/pipes/utils.py b/python_modules/dagster/dagster/_core/pipes/utils.py index add8889626ca5..e161c6f2e3bba 100644 --- a/python_modules/dagster/dagster/_core/pipes/utils.py +++ b/python_modules/dagster/dagster/_core/pipes/utils.py @@ -134,13 +134,18 @@ class PipesFileMessageReader(PipesMessageReader): Args: path (str): The path of the file to which messages will be written. The file will be deleted on close of the pipes session. + include_stdio_in_messages (bool): Whether to include stdout/stderr logs in the messages produced by the message writer in the external process. + cleanup_file (bool): Whether to delete the file on close of the pipes session. """ - def __init__(self, path: str, include_stdio_in_messages: bool = False): + def __init__( + self, path: str, include_stdio_in_messages: bool = False, cleanup_file: bool = True + ): self._path = check.str_param(path, "path") self._include_stdio_in_messages = check.bool_param( include_stdio_in_messages, "include_stdio_in_messages" ) + self._cleanup_file = cleanup_file def on_launched(self, params: PipesLaunchedData) -> None: self.launched_payload = params @@ -178,7 +183,7 @@ def read_messages( is_session_closed.set() if thread: thread.join() - if os.path.exists(self._path): + if os.path.exists(self._path) and self._cleanup_file: os.remove(self._path) def _reader_thread(self, handler: "PipesMessageHandler", is_resource_complete: Event) -> None: