Skip to content

Commit

Permalink
BUG: fix tmp cleanup on failed and clean exits (qiime2#251)
Browse files Browse the repository at this point in the history
Fixes qiime2#223.
Fixes qiime2#181.

The following clean up correctly:

- Writing a script using the Artifact API where the artifacts go out of scope at the end of the script (qiime2#223)
- Running the tests without `QIIMETEST` set
- Running a script where the action fails from input
- Modifying the `callable_wrapper` to except in the middle and running the tests

These used to leave behind junk and no longer do.
  • Loading branch information
ebolyen authored and jairideout committed May 17, 2017
1 parent 02e5fe2 commit c6d9554
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 63 deletions.
17 changes: 3 additions & 14 deletions qiime2/core/archive/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import uuid as _uuid
import pathlib
import zipfile
import tempfile
import importlib
import os
import shutil
import io

import qiime2
Expand Down Expand Up @@ -234,10 +232,7 @@ class Archiver:

@classmethod
def _make_temp_path(cls):
# Not using OutPath because it cleans itself up. Archiver should be
# responsible for that.
# TODO: normalize `mkdtemp` when we have framework temp locations.
return pathlib.Path(tempfile.mkdtemp(prefix='qiime2-archive-'))
return qiime2.core.path.ArchivePath()

@classmethod
def get_format_class(cls, version):
Expand Down Expand Up @@ -315,7 +310,6 @@ def from_data(cls, type, format, data_initializer, provenance_capture):
def __init__(self, path, fmt):
self.path = path
self._fmt = fmt
self._pid = os.getpid()

@property
def uuid(self):
Expand All @@ -341,13 +335,8 @@ def root_dir(self):
def provenance_dir(self):
return getattr(self._fmt, 'provenance_dir', None)

def __del__(self):
# Destructor can be called more than once.
if self.path.exists() and self._pid == os.getpid():
shutil.rmtree(str(self.path))

def save(self, filepath):
self.CURRENT_ARCHIVE.save(self.path, filepath)

def orphan(self, pid):
self._pid = pid
def orphan(self):
self.path.orphan()
20 changes: 5 additions & 15 deletions qiime2/core/archive/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import uuid
import copy
import importlib
import pathlib
import shutil
import tempfile
import sys
from datetime import datetime

Expand Down Expand Up @@ -103,11 +101,14 @@ def __init__(self):
# us treat all transformations uniformly.
self.transformers = collections.OrderedDict()

# TODO: normalize `mkdtemp` when we have framework temp locations.
self.path = pathlib.Path(tempfile.mkdtemp(prefix='qiime2-prov-'))
self._build_paths()

def _destructor(self):
self.path._destructor()

def _build_paths(self):
self.path = qiime2.core.path.ProvenancePath()

self.ancestor_dir = self.path / self.ANCESTOR_DIR
self.ancestor_dir.mkdir()

Expand Down Expand Up @@ -213,22 +214,11 @@ def fork(self):
forked.transformers = forked.transformers.copy()
# create a copy of the backing dir so factory (the hard stuff is
# mostly done by this point)
forked.path = pathlib.Path(tempfile.mkdtemp(prefix='qiime2-prov-'))
forked._build_paths()

distutils.dir_util.copy_tree(str(self.path), str(forked.path))

return forked

def __del__(self):
# Used to delete the original source when "forking". Each forked child
# will be a copy, but the original won't ever be renamed, so the
# directory would persist. Instead of adding logic for "first fork"
# it's easier to just let it erase itself should it go out of scope
# without a rename.
if self.path.exists():
shutil.rmtree(str(self.path))


class ImportProvenanceCapture(ProvenanceCapture):
def __init__(self, format=None, checksums=None):
Expand Down
78 changes: 65 additions & 13 deletions qiime2/core/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

import os
import pathlib
import shutil
import distutils
import tempfile
import weakref


_ConcretePath = type(pathlib.Path())
Expand Down Expand Up @@ -55,6 +57,16 @@ def open(self, mode='r', buffering=-1, encoding=None, errors=None,


class OutPath(OwnedPath):
@classmethod
def _destruct(cls, path):
if not os.path.exists(path):
return

if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)

def __new__(cls, dir=False, **kwargs):
"""
Create a tempfile, return pathlib.Path reference to it.
Expand All @@ -63,22 +75,62 @@ def __new__(cls, dir=False, **kwargs):
name = tempfile.mkdtemp(**kwargs)
else:
_, name = tempfile.mkstemp(**kwargs)
return super().__new__(cls, name)

def __del__(self):
self._finalize()
if hasattr(super(), "__del__"):
super().__del__(self)
obj = super().__new__(cls, name)
obj._destructor = weakref.finalize(obj, cls._destruct, str(obj))
return obj

def __exit__(self, t, v, tb):
self._finalize()
self._destructor()
super().__exit__(t, v, tb)

def _finalize(self):
if not self.exists():
return

if self.is_dir():
shutil.rmtree(str(self))
class InternalDirectory(_ConcretePath):
DEFAULT_PREFIX = 'qiime2-'

@classmethod
def _destruct(cls, path):
"""DO NOT USE DIRECTLY, use `_destructor()` instead"""
if os.path.exists(path):
shutil.rmtree(path)

@classmethod
def __new(cls, *args):
self = super().__new__(cls, *args)
self._destructor = weakref.finalize(self, self._destruct, str(self))
return self

def __new__(cls, *args, prefix=None):
if args and prefix is not None:
raise TypeError("Cannot pass a path and a prefix at the same time")
elif args:
# This happens when the base-class's __reduce__ method is invoked
# for pickling.
return cls.__new(*args)
else:
self.unlink()
if prefix is None:
prefix = cls.DEFAULT_PREFIX
elif not prefix.startswith(cls.DEFAULT_PREFIX):
prefix = cls.DEFAULT_PREFIX + prefix
# TODO: normalize when temp-directories are configurable
path = tempfile.mkdtemp(prefix=prefix)
return cls.__new(path)

def __truediv__(self, path):
# We don't want to create self-destructing paths when using the join
# operator
return _ConcretePath(str(self), path)

def __rtruediv__(self, path):
# Same reasoning as truediv
return _ConcretePath(path, str(self))

def orphan(self):
self._destructor.detach()


class ArchivePath(InternalDirectory):
DEFAULT_PREFIX = 'qiime2-archive-'


class ProvenancePath(InternalDirectory):
DEFAULT_PREFIX = 'qiime2-provenance-'
4 changes: 2 additions & 2 deletions qiime2/core/tests/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ def test_new_outpath_context_mgr(self):
self.assertTrue(os.path.isfile(path))
self.assertFalse(os.path.isfile(path))

def test_finalize(self):
def test_destructor(self):
f = OutPath()
path = str(f)

self.assertTrue(os.path.isfile(path))
f._finalize()
f._destructor()
self.assertFalse(os.path.isfile(path))


Expand Down
68 changes: 51 additions & 17 deletions qiime2/sdk/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@
from qiime2.core.util import LateBindingAttribute, DropFirstParameter, tuplize


# These aren't globals as much as a process locals. This is necessary because
# atexit handlers aren't invoked during a subprocess exit (`_exit` is called)
_FAILURE_PROCESS_CLEANUP = []
_ALWAYS_PROCESS_CLEANUP = []


def _async_action(action, args, kwargs):
"""Helper to cleanup because atexit destructors are not called"""
try:
return action(*args, **kwargs)
except: # This is cleanup, even KeyboardInterrupt should be caught
for garbage in _FAILURE_PROCESS_CLEANUP:
garbage._destructor()
raise
finally:
for garbage in _ALWAYS_PROCESS_CLEANUP:
garbage._destructor()


class Action(metaclass=abc.ABCMeta):
type = 'action'

Expand Down Expand Up @@ -132,6 +151,9 @@ def _get_callable_wrapper(self):
def callable_wrapper(*args, **kwargs):
provenance = archive.ActionProvenanceCapture(
self.type, self.package, self.id)
if self._is_subprocess():
_ALWAYS_PROCESS_CLEANUP.append(provenance)

# This function's signature is rewritten below using
# `decorator.decorator`. When the signature is rewritten, args[0]
# is the function whose signature was used to rewrite this
Expand All @@ -155,6 +177,13 @@ def callable_wrapper(*args, **kwargs):
for name in self.signature.inputs:
artifact = artifacts[name] = user_input[name]
provenance.add_input(name, artifact)
if self._is_subprocess():
# Cleanup shouldn't be handled in the subprocess, it
# doesn't own any of these inputs, they were just provided.
# We also can't rely on the subprocess preventing atexit
# hooks as the destructor is also called when the artifact
# goes out of scope (which happens).
artifact._orphan()

parameters = {}
for name, spec in self.signature.parameters.items():
Expand All @@ -169,23 +198,21 @@ def callable_wrapper(*args, **kwargs):

outputs = self._callable_executor_(self._callable, view_args,
output_types, provenance)
# `outputs` matches a Python function's return: either a single
# value is returned, or it is a tuple of return values. Treat both
# cases uniformly.
outputs_tuple = tuplize(outputs)
for output in outputs_tuple:
output._orphan(self._pid)

if len(outputs_tuple) != len(self.signature.outputs):
# The outputs don't need to be orphaned, because their destructors
# aren't invoked in atexit for a subprocess, instead the
# `_async_action` helper will detect failure and cleanup if needed.
# These are meant to be owned by the parent process.

if len(outputs) != len(self.signature.outputs):
raise ValueError(
"Number of callable outputs must match number of outputs "
"defined in signature: %d != %d" %
(len(outputs_tuple), len(self.signature.outputs)))
(len(outputs), len(self.signature.outputs)))

# Wrap in a Results object mapping output name to value so users
# have access to outputs by name or position.
return qiime2.sdk.Results(self.signature.outputs.keys(),
outputs_tuple)
outputs)

callable_wrapper = self._rewrite_wrapper_signature(callable_wrapper)
self._set_wrapper_properties(callable_wrapper, '__call__')
Expand All @@ -210,7 +237,7 @@ def async_wrapper(*args, **kwargs):
args = args[1:]

pool = concurrent.futures.ProcessPoolExecutor(max_workers=1)
future = pool.submit(self, *args, **kwargs)
future = pool.submit(_async_action, self, args, kwargs)
pool.shutdown(wait=False)
return future

Expand All @@ -233,6 +260,9 @@ def _set_wrapper_properties(self, wrapper, name):
# not the "artifact API").
del wrapper.__wrapped__

def _is_subprocess(self):
return self._pid != os.getpid()


class Method(Action):
type = 'method'
Expand All @@ -245,6 +275,7 @@ def _callable_sig_converter_(self, callable):

def _callable_executor_(self, callable, view_args, output_types,
provenance):
is_subprocess = self._is_subprocess()
output_views = callable(**view_args)
output_views = tuplize(output_views)

Expand All @@ -271,11 +302,10 @@ def _callable_executor_(self, callable, view_args, output_types,
spec.qiime_type, output_view, spec.view_type,
provenance.fork())
output_artifacts.append(artifact)
if is_subprocess:
_FAILURE_PROCESS_CLEANUP.append(artifact._archiver)

if len(output_artifacts) == 1:
return output_artifacts[0]
else:
return tuple(output_artifacts)
return tuple(output_artifacts)

@classmethod
def _init(cls, callable, inputs, parameters, outputs, package, name,
Expand Down Expand Up @@ -307,8 +337,12 @@ def _callable_executor_(self, callable, view_args, output_types,
raise TypeError(
"Visualizer %r should not return anything. "
"Received %r as a return value." % (self, ret_val))
return qiime2.sdk.Visualization._from_data_dir(temp_dir,
provenance)
viz = qiime2.sdk.Visualization._from_data_dir(temp_dir,
provenance)
if self._is_subprocess():
_FAILURE_PROCESS_CLEANUP.append(viz._archiver)

return (viz,)

@classmethod
def _init(cls, callable, inputs, parameters, package, name, description,
Expand Down
4 changes: 2 additions & 2 deletions qiime2/sdk/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ def export_data(self, output_dir):
# format tranformations may return the invoked transformers
return None

def _orphan(self, pid):
self._archiver.orphan(pid)
def _orphan(self):
self._archiver.orphan()

def save(self, filepath):
if not filepath.endswith(self.extension):
Expand Down

0 comments on commit c6d9554

Please sign in to comment.