Skip to content

Commit

Permalink
[NYS2AWS-134] bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
pvriel committed Feb 10, 2025
1 parent 8562088 commit a13ce9c
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.service.cmr.repository.NodeRef;
Expand All @@ -22,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand All @@ -33,6 +36,7 @@ public class SolrUndersizedTransactionsHealthProcessorPlugin extends ToggleableH
private static final @NonNull String QUEUED_MERGE_REQUESTS_STATE_KEY = "queued-merge-requests";

private final @NonNull TransactionHelper transactionHelper;
private final int mergerThreads;
private final @NonNull AtomicInteger queuedMergeRequests = new AtomicInteger(0);
private final @NonNull ExecutorService mergerExecutor;
private final @NonNull AbstractNodeDAOImpl nodeDAO;
Expand All @@ -47,6 +51,7 @@ public SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int merg
if (enabled) guaranteeThresholdIndexerIsUsed(properties);

this.transactionHelper = transactionHelper;
this.mergerThreads = mergerThreads;
this.mergerExecutor = Executors.newFixedThreadPool(mergerThreads);
this.nodeDAO = nodeDAO;

Expand All @@ -58,11 +63,23 @@ public SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int merg

@Nonnull
@Override
@SneakyThrows(InterruptedException.class)
@Synchronized("queuedMergeRequests")
protected Set<@NonNull NodeHealthReport> doProcess(Set<@NonNull NodeRef> allNodeRefs) {
/*
Problem: if the database can not keep up with the amount of transactions we are generating here,
we will eventually run out of memory (due to the increasing queue size).
Solution: keep track of the amount of queued merge requests and limit the amount of queued merge requests with a lock.
*/
if (queuedMergeRequests.get() >= mergerThreads) {
log.debug("The maximum amount of ({}) merge requests is already queued. Waiting for a merge request to finish.", mergerThreads);
queuedMergeRequests.wait();
}

// This is NOT just a logging statement; the counter is incremented! Do not remove!
mergerExecutor.submit(() -> mergeTransactions(allNodeRefs));
log.debug("Queueing a new batch of ({}) transaction(s). Currently, there are ({}) merge requests queued.",
allNodeRefs.size(), queuedMergeRequests.incrementAndGet());
mergerExecutor.submit(() -> mergeTransactions(allNodeRefs));
return NodeHealthReport.ofHealthy(allNodeRefs);
}

Expand All @@ -78,7 +95,11 @@ private void mergeTransactions(@NonNull Set<@NonNull NodeRef> backgroundWorkerBa
} catch (Exception e) {
log.error("An error occurred while merging a batch of ({}) node(s).", backgroundWorkerBatch.size(), e);
} finally {
queuedMergeRequests.decrementAndGet();
synchronized (queuedMergeRequests) {
queuedMergeRequests.decrementAndGet();
// Give a notification to the next waiting thread that there is now space in the queue.
queuedMergeRequests.notify();
}
}
}

Expand Down
Loading

0 comments on commit a13ce9c

Please sign in to comment.