diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index dd88a55055..bfc1773a92 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -11,6 +11,7 @@ # Import packages import os +import multiprocessing as mp from multiprocessing import Pool, cpu_count, pool from traceback import format_exception import sys @@ -73,24 +74,65 @@ def run_node(node, updatehash, taskid): # Return the result dictionary return result +# Pythons 2.7, 3.4-3.7.0, and 3.7.1 have three different implementations of +# pool.Pool().Process(), and the type of the result varies based on the default +# multiprocessing context, so we need to dynamically patch the daemon property +class NonDaemonMixin(object): + @property + def daemon(self): + return False + + @daemon.setter + def daemon(self, val): + pass -class NonDaemonPool(pool.Pool): - """A process pool with non-daemon processes. - """ - def Process(self, *args, **kwds): - proc = super(NonDaemonPool, self).Process(*args, **kwds) - - class NonDaemonProcess(proc.__class__): - """Monkey-patch process to ensure it is never daemonized""" - @property - def daemon(self): - return False - - @daemon.setter - def daemon(self, val): - pass - proc.__class__ = NonDaemonProcess - return proc +try: + from multiprocessing import context + # Exists on all platforms + class NonDaemonSpawnProcess(NonDaemonMixin, context.SpawnProcess): + pass + class NonDaemonSpawnContext(context.SpawnContext): + Process = NonDaemonSpawnProcess + _nondaemon_context_mapper = { + 'spawn': NonDaemonSpawnContext() + } + + # POSIX only + try: + class NonDaemonForkProcess(NonDaemonMixin, context.ForkProcess): + pass + class NonDaemonForkContext(context.ForkContext): + Process = NonDaemonForkProcess + _nondaemon_context_mapper['fork'] = NonDaemonForkContext() + except AttributeError: + pass + # POSIX only + try: + class NonDaemonForkServerProcess(NonDaemonMixin, context.ForkServerProcess): + pass + class NonDaemonForkServerContext(context.ForkServerContext): + Process = NonDaemonForkServerProcess + _nondaemon_context_mapper['forkserver'] = NonDaemonForkServerContext() + except AttributeError: + pass + + class NonDaemonPool(pool.Pool): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None, context=None): + if context is None: + context = mp.get_context() + context = _nondaemon_context_mapper[context._name] + super(NonDaemonPool, self).__init__(processes=processes, + initializer=initializer, + initargs=initargs, + maxtasksperchild=maxtasksperchild, + context=context) + +except ImportError: + class NonDaemonProcess(NonDaemonMixin, mp.Process): + pass + class NonDaemonPool(pool.Pool): + Process = NonDaemonProcess class LegacyMultiProcPlugin(DistributedPluginBase):