Skip to content

Commit

Permalink
MPI_Abort() called when unhandled exception raised.
Browse files Browse the repository at this point in the history
Without this, the MPI process would run indefinitely, which was very annoying.
  • Loading branch information
wwoods committed Oct 20, 2016
1 parent c9992f7 commit 8f8def3
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 14 deletions.
7 changes: 5 additions & 2 deletions bin/job_stream
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ def doRun(hostfile, checkpoint, argv):
# execute the program locally to test, or with `job_stream` for
# parallelization.
mpiEnvVars = [ ('-x', key) for key in env ]
mpiArgs = (mpirun, '-host', hosts, '--map-by', 'node') + tuple(
b for a in mpiEnvVars for b in a)
mpiArgs = (
(mpirun, '-host', hosts)
+ ('--map-by', 'node', '--bind-to', 'none')
+ ('--mca', 'orte_abort_on_non_zero_status', '1')
+ tuple(b for a in mpiEnvVars for b in a))
if not useMpi:
mpiArgs = ()

Expand Down
8 changes: 8 additions & 0 deletions job_stream/job_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ Flags:\n\
}


void mpiAbort() {
if (!mpiEnvHolder) {
ERROR("MPI not initialized, cannot abort");
}
mpiEnvHolder->abort(1);
}


void runProcessor(int argc, char** argv) {
Debug::DeathHandler dh;
dh.set_color_output(false);
Expand Down
4 changes: 4 additions & 0 deletions job_stream/job_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ namespace job_stream {
int getRank();


/** Calls mpi abort() */
void mpiAbort();


/** Add work to the initialWork queue, which overrides stdin or the
argc, argv combination. */
template<typename T>
Expand Down
24 changes: 16 additions & 8 deletions job_stream/processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ unsigned int getConcurrency() {
if (cfg) {
std::ifstream f(cfg);
if (!f) {
ERROR("Bad config file: " << cfg);
ERROR("Non-existent config file: " << cfg);
}

std::string sCfg;
Expand Down Expand Up @@ -658,13 +658,26 @@ void Processor::run(const std::string& inputLine) {
//If an exception happens in the main thread, then we will catch it and
//join our threads before re-raising. Otherwise, for whatever reason,
//e.g. the python interpreter does not properly exit.
this->joinThreads();
throw;
this->workerErrors.push_back(std::current_exception());
}

//Stop all threads
this->joinThreads();

//Did we stop because of an error? Log to console, and then abort via MPI.
if (this->workerErrors.size() != 0) {
try {
std::rethrow_exception(this->workerErrors[0]);
}
catch (const std::exception& e) {
JobLog() << "Caught C++ exception: " << e.what();
}
catch (...) {
JobLog() << "Caught C++ exception of non-std::exception type";
}
job_stream::mpiAbort();
}

//Stop timer, report on user vs not
outerTimer.reset();
this->localTimersMerge();
Expand Down Expand Up @@ -746,11 +759,6 @@ void Processor::run(const std::string& inputLine) {
totalTime / 10, totalCpuTime / 10000,
(double)totalCpu / timesTotal, timesTotal * 0.001);
}

//Did we stop because of an error? Rethrow it!
if (this->workerErrors.size() != 0) {
std::rethrow_exception(this->workerErrors[0]);
}
}


Expand Down
2 changes: 1 addition & 1 deletion python/job_stream/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# 1) we don't load dependencies by storing it in __init__.py
# 2) we can import it into setup.py for the same reason
# 3) we can import it into our module
__version__ = '0.1.27'
__version__ = '0.1.28'

7 changes: 5 additions & 2 deletions sphinx/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
Changelog
=========

* 2016-10-18 - Hostfile lines may now specify e.g. `cpus=8` after a hostname to
* 2016-10-20 - Modified Processor behavior to automatically call MPI_Abort when an exception is thrown by a job. This fixes some hangs when errors were raised in Python code, and should make job_stream behave more robustly in general.

The job_stream binary was also modified to pass the ``mca`` flag ``orte_abort_on_non_zero_status``, to make sure that an arbitrary death of a process also causes MPI_Abort to be called.
* 2016-10-18 - Hostfile lines may now specify e.g. ``cpus=8`` after a hostname to
manually set the number of cores to be used by job_stream on a given machine.
* 2016-10-12 - Added `--map-by node` to `job_stream` binary's flags for `mpirun`.
* 2016-10-12 - Added ``--map-by node`` to ``job_stream`` binary's flags for ``mpirun``.
Apparently, newer versions of MPI restrict spawned processes to a single core.
Since job_stream does its own parallelization, this prevented any
parallelization at all on a single machine, which was disastrous.
Expand Down
4 changes: 3 additions & 1 deletion sphinx/main.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ Afterwards, if any script is run via ``job_stream``, it will run on the machines
Will run ``script.py`` and distribute its work across machine1, machine2, and machine4. If you ever want to run a script on something other than the default configured hostfile, ``job_stream`` accepts ``--host`` and ``--hostfile`` arguments (see ``job_stream --help`` for more information).

.. note:: Parameters, such as `cpus=`, must be specified on the same line as the host! Hosts attempt to match themselves to these lines by either shared name or shared IP address.
.. note:: You *must* separate any arguments to your program and ``job_stream``'s arguments with a ``--``, as seen above.

.. note:: Parameters, such as ``cpus=``, must be specified on the same line as the host! Hosts attempt to match themselves to these lines by either shared name or shared IP address.

Checkpoints
~~~~~~~~~~~
Expand Down

0 comments on commit 8f8def3

Please sign in to comment.