From 77b91597e3c45417d50d53321c61771e6a673a20 Mon Sep 17 00:00:00 2001 From: Jay Carey Date: Tue, 8 Jun 2021 14:20:48 -0400 Subject: [PATCH] Jc fix small data race condition (#1690) * [bugfix] Pass MAX_RECORDS_IN_RAM to BasecallsConverter * Use a different thread pool executor for each tile to ensure no race conditions can occur --- .../illumina/SortedBasecallsConverter.java | 56 +++++++------------ .../ThreadPoolExecutorWithExceptions.java | 9 +++ 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/src/main/java/picard/illumina/SortedBasecallsConverter.java b/src/main/java/picard/illumina/SortedBasecallsConverter.java index 46af4a071c..f95f051358 100644 --- a/src/main/java/picard/illumina/SortedBasecallsConverter.java +++ b/src/main/java/picard/illumina/SortedBasecallsConverter.java @@ -20,7 +20,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; /** * SortedBasecallsConverter utilizes an underlying IlluminaDataProvider to convert parsed and decoded sequencing data @@ -44,11 +43,10 @@ public class SortedBasecallsConverter extends BasecallsCo private final int maxReadsInRamPerTile; private final List tmpDirs; private final Map> completedWork = new ConcurrentHashMap<>(); - private final ThreadPoolExecutorWithExceptions tileWriteExecutor; private final ThreadPoolExecutorWithExceptions tileReadExecutor; private final ProgressLogger readProgressLogger = new ProgressLogger(log, 1000000, "Read"); private final ProgressLogger writeProgressLogger = new ProgressLogger(log, 1000000, "Write"); - private final AtomicInteger tileWriteJobs = new AtomicInteger(0); + private final Integer numThreads; /** * Constructs a new SortedBaseCallsConverter. @@ -104,7 +102,7 @@ protected SortedBasecallsConverter( this.codecPrototype = codecPrototype; this.outputRecordComparator = outputRecordComparator; this.outputRecordClass = outputRecordClass; - tileWriteExecutor = new ThreadPoolExecutorWithExceptions(numThreads); + this.numThreads = numThreads; tileReadExecutor = new ThreadPoolExecutorWithExceptions(numThreads); } @@ -141,19 +139,11 @@ private class SortedRecordToWriterPump implements Runnable { @Override public void run() { - tileWriteJobs.incrementAndGet(); for (final CLUSTER_OUTPUT_RECORD record : recordCollection) { writer.write(record); writeProgressLogger.record(null, 0); } recordCollection.cleanup(); - int writeJobsRemaining = tileWriteJobs.decrementAndGet(); - - if (writeJobsRemaining == 0) { - synchronized (tileWriteJobs) { - tileWriteJobs.notifyAll(); - } - } } } @@ -234,38 +224,32 @@ private synchronized SortingCollection createSortingColle protected void awaitTileProcessingCompletion() throws IOException { tileReadExecutor.shutdown(); // Wait for all the read threads to complete before checking for errors - ThreadPoolExecutorUtil.awaitThreadPoolTermination("Reading executor", tileReadExecutor, Duration.ofMinutes(5)); - - // Check for reading errors - if (tileReadExecutor.hasError()) { - interruptAndShutdownExecutors(tileReadExecutor, tileWriteExecutor); - } + awaitExecutor(tileReadExecutor); int tileProcessingIndex = 0; - + ThreadPoolExecutorWithExceptions tileWriteExecutor = null; while (tileProcessingIndex < tiles.size()) { - if (tileWriteJobs.get() == 0) { + awaitExecutor(tileWriteExecutor); + tileWriteExecutor = new ThreadPoolExecutorWithExceptions(numThreads); completedWork.get(tiles.get(tileProcessingIndex)).forEach(tileWriteExecutor::submit); tileProcessingIndex++; - try { - synchronized (tileWriteJobs) { - tileWriteJobs.wait(); - // Short sleep to ensure data is flushed. - Thread.sleep(500); - } - } catch (InterruptedException e) { - throw new PicardException("Error waiting for thread lock during tile processing.", e); - } - } } - tileWriteExecutor.shutdown(); - ThreadPoolExecutorUtil.awaitThreadPoolTermination("Writing executor", tileWriteExecutor, Duration.ofMinutes(5)); + awaitExecutor(tileWriteExecutor); + closeWriters(); + } + + private void awaitExecutor(ThreadPoolExecutorWithExceptions executor) { + if (executor != null) { + executor.shutdown(); + ThreadPoolExecutorUtil.awaitThreadPoolTermination("Writing executor", executor, Duration.ofMinutes(5)); - // Check for tile work synchronization errors - if (tileWriteExecutor.hasError()) { - interruptAndShutdownExecutors(tileWriteExecutor); + // Check for tile work synchronization errors + if (executor.hasError()) { + interruptAndShutdownExecutors(executor); + } + + executor.cleanUp(); } - closeWriters(); } } diff --git a/src/main/java/picard/util/ThreadPoolExecutorWithExceptions.java b/src/main/java/picard/util/ThreadPoolExecutorWithExceptions.java index 73de0f3b7e..76df2d189c 100644 --- a/src/main/java/picard/util/ThreadPoolExecutorWithExceptions.java +++ b/src/main/java/picard/util/ThreadPoolExecutorWithExceptions.java @@ -61,4 +61,13 @@ protected void beforeExecute(Thread t, Runnable r) { public boolean hasError() { return exception != null; } + + /** + * Calls `shutdownNow, adjusts the core size to 0 and low timeout to ensure threads get killed and garbage collected. + */ + public void cleanUp() { + shutdownNow(); + setCorePoolSize(0); + setKeepAliveTime(1, TimeUnit.MINUTES); + } }