From c6d9554f36b039c40ec0d5bb966c564ff33f410e Mon Sep 17 00:00:00 2001 From: Evan Bolyen Date: Tue, 16 May 2017 17:33:39 -0700 Subject: [PATCH] BUG: fix tmp cleanup on failed and clean exits (#251) Fixes #223. Fixes #181. The following clean up correctly: - Writing a script using the Artifact API where the artifacts go out of scope at the end of the script (#223) - Running the tests without `QIIMETEST` set - Running a script where the action fails from input - Modifying the `callable_wrapper` to except in the middle and running the tests These used to leave behind junk and no longer do. --- qiime2/core/archive/archiver.py | 17 ++----- qiime2/core/archive/provenance.py | 20 ++------ qiime2/core/path.py | 78 +++++++++++++++++++++++++------ qiime2/core/tests/test_path.py | 4 +- qiime2/sdk/action.py | 68 ++++++++++++++++++++------- qiime2/sdk/result.py | 4 +- 6 files changed, 128 insertions(+), 63 deletions(-) diff --git a/qiime2/core/archive/archiver.py b/qiime2/core/archive/archiver.py index e98c04cb..deecc9b5 100644 --- a/qiime2/core/archive/archiver.py +++ b/qiime2/core/archive/archiver.py @@ -10,10 +10,8 @@ import uuid as _uuid import pathlib import zipfile -import tempfile import importlib import os -import shutil import io import qiime2 @@ -234,10 +232,7 @@ class Archiver: @classmethod def _make_temp_path(cls): - # Not using OutPath because it cleans itself up. Archiver should be - # responsible for that. - # TODO: normalize `mkdtemp` when we have framework temp locations. - return pathlib.Path(tempfile.mkdtemp(prefix='qiime2-archive-')) + return qiime2.core.path.ArchivePath() @classmethod def get_format_class(cls, version): @@ -315,7 +310,6 @@ def from_data(cls, type, format, data_initializer, provenance_capture): def __init__(self, path, fmt): self.path = path self._fmt = fmt - self._pid = os.getpid() @property def uuid(self): @@ -341,13 +335,8 @@ def root_dir(self): def provenance_dir(self): return getattr(self._fmt, 'provenance_dir', None) - def __del__(self): - # Destructor can be called more than once. - if self.path.exists() and self._pid == os.getpid(): - shutil.rmtree(str(self.path)) - def save(self, filepath): self.CURRENT_ARCHIVE.save(self.path, filepath) - def orphan(self, pid): - self._pid = pid + def orphan(self): + self.path.orphan() diff --git a/qiime2/core/archive/provenance.py b/qiime2/core/archive/provenance.py index ff75b67e..2da242ed 100644 --- a/qiime2/core/archive/provenance.py +++ b/qiime2/core/archive/provenance.py @@ -12,9 +12,7 @@ import uuid import copy import importlib -import pathlib import shutil -import tempfile import sys from datetime import datetime @@ -103,11 +101,14 @@ def __init__(self): # us treat all transformations uniformly. self.transformers = collections.OrderedDict() - # TODO: normalize `mkdtemp` when we have framework temp locations. - self.path = pathlib.Path(tempfile.mkdtemp(prefix='qiime2-prov-')) self._build_paths() + def _destructor(self): + self.path._destructor() + def _build_paths(self): + self.path = qiime2.core.path.ProvenancePath() + self.ancestor_dir = self.path / self.ANCESTOR_DIR self.ancestor_dir.mkdir() @@ -213,22 +214,11 @@ def fork(self): forked.transformers = forked.transformers.copy() # create a copy of the backing dir so factory (the hard stuff is # mostly done by this point) - forked.path = pathlib.Path(tempfile.mkdtemp(prefix='qiime2-prov-')) forked._build_paths() - distutils.dir_util.copy_tree(str(self.path), str(forked.path)) return forked - def __del__(self): - # Used to delete the original source when "forking". Each forked child - # will be a copy, but the original won't ever be renamed, so the - # directory would persist. Instead of adding logic for "first fork" - # it's easier to just let it erase itself should it go out of scope - # without a rename. - if self.path.exists(): - shutil.rmtree(str(self.path)) - class ImportProvenanceCapture(ProvenanceCapture): def __init__(self, format=None, checksums=None): diff --git a/qiime2/core/path.py b/qiime2/core/path.py index 3de0ba2e..5bd07c96 100644 --- a/qiime2/core/path.py +++ b/qiime2/core/path.py @@ -6,10 +6,12 @@ # The full license is in the file LICENSE, distributed with this software. # ---------------------------------------------------------------------------- +import os import pathlib import shutil import distutils import tempfile +import weakref _ConcretePath = type(pathlib.Path()) @@ -55,6 +57,16 @@ def open(self, mode='r', buffering=-1, encoding=None, errors=None, class OutPath(OwnedPath): + @classmethod + def _destruct(cls, path): + if not os.path.exists(path): + return + + if os.path.isdir(path): + shutil.rmtree(path) + else: + os.unlink(path) + def __new__(cls, dir=False, **kwargs): """ Create a tempfile, return pathlib.Path reference to it. @@ -63,22 +75,62 @@ def __new__(cls, dir=False, **kwargs): name = tempfile.mkdtemp(**kwargs) else: _, name = tempfile.mkstemp(**kwargs) - return super().__new__(cls, name) - - def __del__(self): - self._finalize() - if hasattr(super(), "__del__"): - super().__del__(self) + obj = super().__new__(cls, name) + obj._destructor = weakref.finalize(obj, cls._destruct, str(obj)) + return obj def __exit__(self, t, v, tb): - self._finalize() + self._destructor() super().__exit__(t, v, tb) - def _finalize(self): - if not self.exists(): - return - if self.is_dir(): - shutil.rmtree(str(self)) +class InternalDirectory(_ConcretePath): + DEFAULT_PREFIX = 'qiime2-' + + @classmethod + def _destruct(cls, path): + """DO NOT USE DIRECTLY, use `_destructor()` instead""" + if os.path.exists(path): + shutil.rmtree(path) + + @classmethod + def __new(cls, *args): + self = super().__new__(cls, *args) + self._destructor = weakref.finalize(self, self._destruct, str(self)) + return self + + def __new__(cls, *args, prefix=None): + if args and prefix is not None: + raise TypeError("Cannot pass a path and a prefix at the same time") + elif args: + # This happens when the base-class's __reduce__ method is invoked + # for pickling. + return cls.__new(*args) else: - self.unlink() + if prefix is None: + prefix = cls.DEFAULT_PREFIX + elif not prefix.startswith(cls.DEFAULT_PREFIX): + prefix = cls.DEFAULT_PREFIX + prefix + # TODO: normalize when temp-directories are configurable + path = tempfile.mkdtemp(prefix=prefix) + return cls.__new(path) + + def __truediv__(self, path): + # We don't want to create self-destructing paths when using the join + # operator + return _ConcretePath(str(self), path) + + def __rtruediv__(self, path): + # Same reasoning as truediv + return _ConcretePath(path, str(self)) + + def orphan(self): + self._destructor.detach() + + +class ArchivePath(InternalDirectory): + DEFAULT_PREFIX = 'qiime2-archive-' + + +class ProvenancePath(InternalDirectory): + DEFAULT_PREFIX = 'qiime2-provenance-' diff --git a/qiime2/core/tests/test_path.py b/qiime2/core/tests/test_path.py index b4b4d68c..7fa4feb8 100644 --- a/qiime2/core/tests/test_path.py +++ b/qiime2/core/tests/test_path.py @@ -29,12 +29,12 @@ def test_new_outpath_context_mgr(self): self.assertTrue(os.path.isfile(path)) self.assertFalse(os.path.isfile(path)) - def test_finalize(self): + def test_destructor(self): f = OutPath() path = str(f) self.assertTrue(os.path.isfile(path)) - f._finalize() + f._destructor() self.assertFalse(os.path.isfile(path)) diff --git a/qiime2/sdk/action.py b/qiime2/sdk/action.py index 299f0661..26b79ecf 100644 --- a/qiime2/sdk/action.py +++ b/qiime2/sdk/action.py @@ -20,6 +20,25 @@ from qiime2.core.util import LateBindingAttribute, DropFirstParameter, tuplize +# These aren't globals as much as a process locals. This is necessary because +# atexit handlers aren't invoked during a subprocess exit (`_exit` is called) +_FAILURE_PROCESS_CLEANUP = [] +_ALWAYS_PROCESS_CLEANUP = [] + + +def _async_action(action, args, kwargs): + """Helper to cleanup because atexit destructors are not called""" + try: + return action(*args, **kwargs) + except: # This is cleanup, even KeyboardInterrupt should be caught + for garbage in _FAILURE_PROCESS_CLEANUP: + garbage._destructor() + raise + finally: + for garbage in _ALWAYS_PROCESS_CLEANUP: + garbage._destructor() + + class Action(metaclass=abc.ABCMeta): type = 'action' @@ -132,6 +151,9 @@ def _get_callable_wrapper(self): def callable_wrapper(*args, **kwargs): provenance = archive.ActionProvenanceCapture( self.type, self.package, self.id) + if self._is_subprocess(): + _ALWAYS_PROCESS_CLEANUP.append(provenance) + # This function's signature is rewritten below using # `decorator.decorator`. When the signature is rewritten, args[0] # is the function whose signature was used to rewrite this @@ -155,6 +177,13 @@ def callable_wrapper(*args, **kwargs): for name in self.signature.inputs: artifact = artifacts[name] = user_input[name] provenance.add_input(name, artifact) + if self._is_subprocess(): + # Cleanup shouldn't be handled in the subprocess, it + # doesn't own any of these inputs, they were just provided. + # We also can't rely on the subprocess preventing atexit + # hooks as the destructor is also called when the artifact + # goes out of scope (which happens). + artifact._orphan() parameters = {} for name, spec in self.signature.parameters.items(): @@ -169,23 +198,21 @@ def callable_wrapper(*args, **kwargs): outputs = self._callable_executor_(self._callable, view_args, output_types, provenance) - # `outputs` matches a Python function's return: either a single - # value is returned, or it is a tuple of return values. Treat both - # cases uniformly. - outputs_tuple = tuplize(outputs) - for output in outputs_tuple: - output._orphan(self._pid) - - if len(outputs_tuple) != len(self.signature.outputs): + # The outputs don't need to be orphaned, because their destructors + # aren't invoked in atexit for a subprocess, instead the + # `_async_action` helper will detect failure and cleanup if needed. + # These are meant to be owned by the parent process. + + if len(outputs) != len(self.signature.outputs): raise ValueError( "Number of callable outputs must match number of outputs " "defined in signature: %d != %d" % - (len(outputs_tuple), len(self.signature.outputs))) + (len(outputs), len(self.signature.outputs))) # Wrap in a Results object mapping output name to value so users # have access to outputs by name or position. return qiime2.sdk.Results(self.signature.outputs.keys(), - outputs_tuple) + outputs) callable_wrapper = self._rewrite_wrapper_signature(callable_wrapper) self._set_wrapper_properties(callable_wrapper, '__call__') @@ -210,7 +237,7 @@ def async_wrapper(*args, **kwargs): args = args[1:] pool = concurrent.futures.ProcessPoolExecutor(max_workers=1) - future = pool.submit(self, *args, **kwargs) + future = pool.submit(_async_action, self, args, kwargs) pool.shutdown(wait=False) return future @@ -233,6 +260,9 @@ def _set_wrapper_properties(self, wrapper, name): # not the "artifact API"). del wrapper.__wrapped__ + def _is_subprocess(self): + return self._pid != os.getpid() + class Method(Action): type = 'method' @@ -245,6 +275,7 @@ def _callable_sig_converter_(self, callable): def _callable_executor_(self, callable, view_args, output_types, provenance): + is_subprocess = self._is_subprocess() output_views = callable(**view_args) output_views = tuplize(output_views) @@ -271,11 +302,10 @@ def _callable_executor_(self, callable, view_args, output_types, spec.qiime_type, output_view, spec.view_type, provenance.fork()) output_artifacts.append(artifact) + if is_subprocess: + _FAILURE_PROCESS_CLEANUP.append(artifact._archiver) - if len(output_artifacts) == 1: - return output_artifacts[0] - else: - return tuple(output_artifacts) + return tuple(output_artifacts) @classmethod def _init(cls, callable, inputs, parameters, outputs, package, name, @@ -307,8 +337,12 @@ def _callable_executor_(self, callable, view_args, output_types, raise TypeError( "Visualizer %r should not return anything. " "Received %r as a return value." % (self, ret_val)) - return qiime2.sdk.Visualization._from_data_dir(temp_dir, - provenance) + viz = qiime2.sdk.Visualization._from_data_dir(temp_dir, + provenance) + if self._is_subprocess(): + _FAILURE_PROCESS_CLEANUP.append(viz._archiver) + + return (viz,) @classmethod def _init(cls, callable, inputs, parameters, package, name, description, diff --git a/qiime2/sdk/result.py b/qiime2/sdk/result.py index aaa30d1f..abf111e8 100644 --- a/qiime2/sdk/result.py +++ b/qiime2/sdk/result.py @@ -126,8 +126,8 @@ def export_data(self, output_dir): # format tranformations may return the invoked transformers return None - def _orphan(self, pid): - self._archiver.orphan(pid) + def _orphan(self): + self._archiver.orphan() def save(self, filepath): if not filepath.endswith(self.extension):