Skip to content

Commit

Permalink
Jc fix small data race condition (#1690)
Browse files Browse the repository at this point in the history
* [bugfix] Pass MAX_RECORDS_IN_RAM to BasecallsConverter
* Use a different thread pool executor for each tile to ensure no race conditions can occur
  • Loading branch information
Jay Carey authored Jun 8, 2021
1 parent 8444642 commit 77b9159
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 36 deletions.
56 changes: 20 additions & 36 deletions src/main/java/picard/illumina/SortedBasecallsConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* SortedBasecallsConverter utilizes an underlying IlluminaDataProvider to convert parsed and decoded sequencing data
Expand All @@ -44,11 +43,10 @@ public class SortedBasecallsConverter<CLUSTER_OUTPUT_RECORD> extends BasecallsCo
private final int maxReadsInRamPerTile;
private final List<File> tmpDirs;
private final Map<Integer, List<? extends Runnable>> completedWork = new ConcurrentHashMap<>();
private final ThreadPoolExecutorWithExceptions tileWriteExecutor;
private final ThreadPoolExecutorWithExceptions tileReadExecutor;
private final ProgressLogger readProgressLogger = new ProgressLogger(log, 1000000, "Read");
private final ProgressLogger writeProgressLogger = new ProgressLogger(log, 1000000, "Write");
private final AtomicInteger tileWriteJobs = new AtomicInteger(0);
private final Integer numThreads;

/**
* Constructs a new SortedBaseCallsConverter.
Expand Down Expand Up @@ -104,7 +102,7 @@ protected SortedBasecallsConverter(
this.codecPrototype = codecPrototype;
this.outputRecordComparator = outputRecordComparator;
this.outputRecordClass = outputRecordClass;
tileWriteExecutor = new ThreadPoolExecutorWithExceptions(numThreads);
this.numThreads = numThreads;
tileReadExecutor = new ThreadPoolExecutorWithExceptions(numThreads);
}

Expand Down Expand Up @@ -141,19 +139,11 @@ private class SortedRecordToWriterPump implements Runnable {

@Override
public void run() {
tileWriteJobs.incrementAndGet();
for (final CLUSTER_OUTPUT_RECORD record : recordCollection) {
writer.write(record);
writeProgressLogger.record(null, 0);
}
recordCollection.cleanup();
int writeJobsRemaining = tileWriteJobs.decrementAndGet();

if (writeJobsRemaining == 0) {
synchronized (tileWriteJobs) {
tileWriteJobs.notifyAll();
}
}
}
}

Expand Down Expand Up @@ -234,38 +224,32 @@ private synchronized SortingCollection<CLUSTER_OUTPUT_RECORD> createSortingColle
protected void awaitTileProcessingCompletion() throws IOException {
tileReadExecutor.shutdown();
// Wait for all the read threads to complete before checking for errors
ThreadPoolExecutorUtil.awaitThreadPoolTermination("Reading executor", tileReadExecutor, Duration.ofMinutes(5));

// Check for reading errors
if (tileReadExecutor.hasError()) {
interruptAndShutdownExecutors(tileReadExecutor, tileWriteExecutor);
}
awaitExecutor(tileReadExecutor);

int tileProcessingIndex = 0;

ThreadPoolExecutorWithExceptions tileWriteExecutor = null;
while (tileProcessingIndex < tiles.size()) {
if (tileWriteJobs.get() == 0) {
awaitExecutor(tileWriteExecutor);
tileWriteExecutor = new ThreadPoolExecutorWithExceptions(numThreads);
completedWork.get(tiles.get(tileProcessingIndex)).forEach(tileWriteExecutor::submit);
tileProcessingIndex++;
try {
synchronized (tileWriteJobs) {
tileWriteJobs.wait();
// Short sleep to ensure data is flushed.
Thread.sleep(500);
}
} catch (InterruptedException e) {
throw new PicardException("Error waiting for thread lock during tile processing.", e);
}
}
}

tileWriteExecutor.shutdown();
ThreadPoolExecutorUtil.awaitThreadPoolTermination("Writing executor", tileWriteExecutor, Duration.ofMinutes(5));
awaitExecutor(tileWriteExecutor);
closeWriters();
}

private void awaitExecutor(ThreadPoolExecutorWithExceptions executor) {
if (executor != null) {
executor.shutdown();
ThreadPoolExecutorUtil.awaitThreadPoolTermination("Writing executor", executor, Duration.ofMinutes(5));

// Check for tile work synchronization errors
if (tileWriteExecutor.hasError()) {
interruptAndShutdownExecutors(tileWriteExecutor);
// Check for tile work synchronization errors
if (executor.hasError()) {
interruptAndShutdownExecutors(executor);
}

executor.cleanUp();
}
closeWriters();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@ protected void beforeExecute(Thread t, Runnable r) {
public boolean hasError() {
return exception != null;
}

/**
* Calls `shutdownNow, adjusts the core size to 0 and low timeout to ensure threads get killed and garbage collected.
*/
public void cleanUp() {
shutdownNow();
setCorePoolSize(0);
setKeepAliveTime(1, TimeUnit.MINUTES);
}
}

0 comments on commit 77b9159

Please sign in to comment.