Skip to content

Commit

Permalink
Add new parallel merge task executor for parallel actions within a si…
Browse files Browse the repository at this point in the history
…ngle merge action (#13124)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: #12740
Relates to: #9626
  • Loading branch information
benwtrent committed Mar 14, 2024
1 parent cb318af commit fb77357
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 50 deletions.
4 changes: 4 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ Improvements

* GITHUB#13156: Hunspell: don't proceed with other suggestions if we found good REP ones (Peter Gromov)

* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first
implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other
executor is provided. (Ben Trent)

Optimizations
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.KnnVectorsWriter;
import org.apache.lucene.codecs.lucene90.IndexedDISI;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -163,7 +165,8 @@ public Lucene99HnswVectorsFormat(int maxConn, int beamWidth) {
* @param numMergeWorkers number of workers (threads) that will be used when doing merge. If
* larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
* generated by this format to do the merge
* generated by this format to do the merge. If null, the configured {@link
* MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used.
*/
public Lucene99HnswVectorsFormat(
int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
Expand All @@ -184,10 +187,6 @@ public Lucene99HnswVectorsFormat(
}
this.maxConn = maxConn;
this.beamWidth = beamWidth;
if (numMergeWorkers > 1 && mergeExec == null) {
throw new IllegalArgumentException(
"No executor service passed in when " + numMergeWorkers + " merge workers are requested");
}
if (numMergeWorkers == 1 && mergeExec != null) {
throw new IllegalArgumentException(
"No executor service is needed as we'll use single thread to merge");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,14 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE
int[][] vectorIndexNodeOffsets = null;
if (scorerSupplier.totalVectorCount() > 0) {
// build graph
HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
HnswGraphMerger merger =
createGraphMerger(
fieldInfo,
scorerSupplier,
mergeState.intraMergeTaskExecutor == null
? null
: new TaskExecutor(mergeState.intraMergeTaskExecutor),
numMergeWorkers);
for (int i = 0; i < mergeState.liveDocs.length; i++) {
merger.addReader(
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
Expand Down Expand Up @@ -496,11 +503,23 @@ private void writeMeta(
}

private HnswGraphMerger createGraphMerger(
FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
FieldInfo fieldInfo,
RandomVectorScorerSupplier scorerSupplier,
TaskExecutor parallelMergeTaskExecutor,
int numParallelMergeWorkers) {
if (mergeExec != null) {
return new ConcurrentHnswMerger(
fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers);
}
if (parallelMergeTaskExecutor != null) {
return new ConcurrentHnswMerger(
fieldInfo,
scorerSupplier,
M,
beamWidth,
parallelMergeTaskExecutor,
numParallelMergeWorkers);
}
return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess;
import org.apache.lucene.internal.tests.TestSecrets;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
Expand Down Expand Up @@ -109,6 +112,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {

private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;

/** The executor provided for intra-merge parallelization */
protected CachedExecutor intraMergeExecutor;

/** Sole constructor, with all settings set to default values. */
public ConcurrentMergeScheduler() {}

Expand Down Expand Up @@ -259,6 +265,16 @@ synchronized void removeMergeThread() {
assert false : "merge thread " + currentThread + " was not found";
}

@Override
public Executor getIntraMergeExecutor(OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
// don't do multithreaded merges for small merges
if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) {
return super.getIntraMergeExecutor(merge);
}
return intraMergeExecutor;
}

@Override
public Directory wrapForMerge(OneMerge merge, Directory in) {
Thread mergeThread = Thread.currentThread();
Expand Down Expand Up @@ -446,7 +462,13 @@ private static String rateToString(double mbPerSec) {

@Override
public void close() {
sync();
try {
sync();
} finally {
if (intraMergeExecutor != null) {
intraMergeExecutor.shutdown();
}
}
}

/**
Expand Down Expand Up @@ -510,6 +532,9 @@ public synchronized int mergeThreadCount() {
void initialize(InfoStream infoStream, Directory directory) throws IOException {
super.initialize(infoStream, directory);
initDynamicDefaults(directory);
if (intraMergeExecutor == null) {
intraMergeExecutor = new CachedExecutor();
}
}

@Override
Expand Down Expand Up @@ -755,11 +780,16 @@ void clearSuppressExceptions() {

@Override
public String toString() {
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
sb.append("ioThrottle=").append(doAutoIOThrottle);
return sb.toString();
return getClass().getSimpleName()
+ ": "
+ "maxThreadCount="
+ maxThreadCount
+ ", "
+ "maxMergeCount="
+ maxMergeCount
+ ", "
+ "ioThrottle="
+ doAutoIOThrottle;
}

private boolean isBacklog(long now, OneMerge merge) {
Expand Down Expand Up @@ -902,12 +932,58 @@ private static String getSegmentName(MergePolicy.OneMerge merge) {
}

static {
TestSecrets.setConcurrentMergeSchedulerAccess(
new ConcurrentMergeSchedulerAccess() {
@Override
public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
cms.setSuppressExceptions();
}
});
TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
}

/**
* This executor provides intra-merge threads for parallel execution of merge tasks. It provides a
* limited number of threads to execute merge tasks. In particular, if the number of
* `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in
* the calling thread.
*/
private class CachedExecutor implements Executor {

private final AtomicInteger activeCount = new AtomicInteger(0);
private final ThreadPoolExecutor executor;

public CachedExecutor() {
this.executor =
new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>());
}

void shutdown() {
executor.shutdown();
}

@Override
public void execute(Runnable command) {
final boolean isThreadAvailable;
// we need to check if a thread is available before submitting the task to the executor
// synchronize on CMS to get an accurate count of current threads
synchronized (ConcurrentMergeScheduler.this) {
int max = maxThreadCount - mergeThreads.size() - 1;
int value = activeCount.get();
if (value < max) {
activeCount.incrementAndGet();
assert activeCount.get() > 0 : "active count must be greater than 0 after increment";
isThreadAvailable = true;
} else {
isThreadAvailable = false;
}
}
if (isThreadAvailable) {
executor.execute(
() -> {
try {
command.run();
} finally {
activeCount.decrementAndGet();
assert activeCount.get() >= 0 : "unexpected negative active count";
}
});
} else {
command.run();
}
}
}
}
17 changes: 15 additions & 2 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3439,7 +3439,14 @@ public void addIndexesReaderMerge(MergePolicy.OneMerge merge) throws IOException
}

SegmentMerger merger =
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
new SegmentMerger(
readers,
segInfo,
infoStream,
trackingDir,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));

if (!merger.shouldMerge()) {
return;
Expand Down Expand Up @@ -5228,7 +5235,13 @@ public int length() {

final SegmentMerger merger =
new SegmentMerger(
mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
mergeReaders,
merge.info.info,
infoStream,
dirWrapper,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();

Expand Down
13 changes: 13 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;

/**
* Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges
Expand All @@ -32,6 +34,8 @@
*/
public abstract class MergeScheduler implements Closeable {

private final Executor executor = new SameThreadExecutorService();

/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
protected MergeScheduler() {}

Expand All @@ -52,6 +56,15 @@ public Directory wrapForMerge(OneMerge merge, Directory in) {
return in;
}

/**
* Provides an executor for parallelism during a single merge operation. By default, the method
* returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their
* calling thread.
*/
public Executor getIntraMergeExecutor(OneMerge merge) {
return executor;
}

/** Close this MergeScheduler. */
@Override
public abstract void close() throws IOException;
Expand Down
11 changes: 10 additions & 1 deletion lucene/core/src/java/org/apache/lucene/index/MergeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
Expand Down Expand Up @@ -84,15 +85,23 @@ public class MergeState {
/** InfoStream for debugging messages. */
public final InfoStream infoStream;

/** Executor for intra merge activity */
public final Executor intraMergeTaskExecutor;

/** Indicates if the index needs to be sorted * */
public boolean needsIndexSort;

/** Sole constructor. */
MergeState(List<CodecReader> readers, SegmentInfo segmentInfo, InfoStream infoStream)
MergeState(
List<CodecReader> readers,
SegmentInfo segmentInfo,
InfoStream infoStream,
Executor intraMergeTaskExecutor)
throws IOException {
verifyIndexSort(readers, segmentInfo);
this.infoStream = infoStream;
int numReaders = readers.size();
this.intraMergeTaskExecutor = intraMergeTaskExecutor;

maxDocs = new int[numReaders];
fieldsProducers = new FieldsProducer[numReaders];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;

import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;

Expand Down Expand Up @@ -52,4 +53,9 @@ public Directory wrapForMerge(OneMerge merge, Directory in) {
public MergeScheduler clone() {
return this;
}

@Override
public Executor getIntraMergeExecutor(OneMerge merge) {
throw new UnsupportedOperationException("NoMergeScheduler does not support merges");
}
}
Loading

0 comments on commit fb77357

Please sign in to comment.