Skip to content

Commit

Permalink
[NYS2AWS-144] added metrics support for threshold indexer & solr unde…
Browse files Browse the repository at this point in the history
…rsized transactions merger plugin
  • Loading branch information
pvriel committed Feb 7, 2025
1 parent 5cc9f71 commit c4e8d1a
Show file tree
Hide file tree
Showing 12 changed files with 742 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@
<constructor-arg name="attributeStore" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoAttributeStore" />
<constructor-arg name="searchTrackingComponent" ref="searchTrackingComponent" />
<constructor-arg name="nodeDAO" ref="nodeDAO" />
<constructor-arg name="meterRegistry" ref="meterRegistry" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,7 @@
<constructor-arg name="properties" ref="global-properties" />
<constructor-arg name="transactionHelper" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoTransactionHelper" />
<constructor-arg name="nodeDAO" ref="nodeDAO" />
</bean>

<!-- Used by the eu.xenit.alfresco.healthprocessor.plugins.solr.SolrUndersizedTransactionsHealthProcessorPlugin bean -->
<bean id="eu.xenit.alfresco.healthprocessor.dictionaryBootstrap" parent="dictionaryModelBootstrap">
<property name="models">
<list>
<value>alfresco/module/alfresco-health-processor/content-model.xml</value>
</list>
</property>
<constructor-arg name="meterRegistry" ref="meterRegistry" />
</bean>

<!-- Solr validation -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.util.AttributeStore;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.AllArgsConstructor;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
Expand All @@ -20,6 +21,7 @@ public final class IndexingStrategyFactoryBean extends AbstractFactoryBean<Index
private final AttributeStore attributeStore;
private final SearchTrackingComponent searchTrackingComponent;
private final AbstractNodeDAOImpl nodeDAO;
private final MeterRegistry meterRegistry;

@Override
public Class<?> getObjectType() {
Expand All @@ -38,7 +40,7 @@ private IndexingStrategy createIndexingStrategy(IndexingStrategy.IndexingStrateg
case LAST_TXNS:
return new LastTxnsBasedIndexingStrategy((LastTxnsIndexingConfiguration) configuration, trackingComponent);
case THRESHOLD:
return new ThresholdIndexingStrategy((ThresholdIndexingStrategyConfiguration) configuration, nodeDAO, searchTrackingComponent);
return new ThresholdIndexingStrategy((ThresholdIndexingStrategyConfiguration) configuration, nodeDAO, searchTrackingComponent, meterRegistry);
default:
throw new IllegalArgumentException("Unknown indexing strategy: "+ indexingStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import eu.xenit.alfresco.healthprocessor.indexing.NullCycleProgress;
import eu.xenit.alfresco.healthprocessor.indexing.SimpleCycleProgress;
import eu.xenit.alfresco.healthprocessor.reporter.api.CycleProgress;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.service.cmr.repository.NodeRef;
import org.springframework.lang.Nullable;

import java.util.HashSet;
import java.util.Map;
Expand All @@ -20,7 +23,7 @@
import java.util.function.LongSupplier;

@Slf4j
public class ThresholdIndexingStrategy implements IndexingStrategy {
public class ThresholdIndexingStrategy implements IndexingStrategy, MeterBinder {

private final @NonNull ThresholdIndexingStrategyConfiguration configuration;
private final @NonNull AbstractNodeDAOImpl nodeDAO;
Expand All @@ -33,9 +36,17 @@ public class ThresholdIndexingStrategy implements IndexingStrategy {
private final @NonNull AtomicReference<@NonNull CycleProgress> cycleProgress = new AtomicReference<>(NullCycleProgress.getInstance());
private final @NonNull LongSupplier progressReporter = state::getCurrentTransactionId;

public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent) {
// Used for testing purposes.
this(configuration, nodeDAO, searchTrackingComponent, null);
}

public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent) {
@NonNull SearchTrackingComponent searchTrackingComponent,
@Nullable MeterRegistry meterRegistry) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));

Expand All @@ -49,7 +60,10 @@ public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration

this.transactionIdMergers = new ThresholdIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++)
this.transactionIdMergers[i] = new ThresholdIndexingStrategyTransactionIdMerger(transactionIdFetcher, queuedNodes, configuration, searchTrackingComponent);
this.transactionIdMergers[i] = new ThresholdIndexingStrategyTransactionIdMerger(transactionIdFetcher, queuedNodes, configuration, searchTrackingComponent, i);

if (meterRegistry != null) bindTo(meterRegistry);
else log.warn("No MeterRegistry provided, so no metrics will be exposed.");
}

@Override
Expand Down Expand Up @@ -107,4 +121,14 @@ public void onStop() {
public @NonNull CycleProgress getCycleProgress() {
return cycleProgress.get();
}

@Override
public void bindTo(@NonNull MeterRegistry registry) {
state.bindTo(registry);
transactionIdFetcher.bindTo(registry);
for (ThresholdIndexingStrategyTransactionIdMerger merger : transactionIdMergers) merger.bindTo(registry);

registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.queued-nodes", queuedNodes, BlockingDeque::size);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.running-background-threads", runningThreads, value -> value.stream().filter(Thread::isAlive).count());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package eu.xenit.alfresco.healthprocessor.indexing.threshold;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -11,7 +13,7 @@
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ThresholdIndexingStrategyState {
public class ThresholdIndexingStrategyState implements MeterBinder {

private long currentTransactionId = -1;
private long maxTransactionId = -1;
Expand All @@ -31,4 +33,12 @@ public void decrementRunningTransactionMergers() {
runningTransactionMergers--;
}

@Override
public void bindTo(@NonNull MeterRegistry registry) {
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.current-transaction-id", this, ThresholdIndexingStrategyState::getCurrentTransactionId);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.max-transaction-id", this, ThresholdIndexingStrategyState::getMaxTransactionId);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.running-transaction-mergers", this, ThresholdIndexingStrategyState::getRunningTransactionMergers);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.queued-transaction-batches", transactionBatchesQueueSize, AtomicInteger::get);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package eu.xenit.alfresco.healthprocessor.indexing.threshold;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.search.SearchTrackingComponent;
Expand All @@ -16,14 +18,16 @@
* I'm just leaving this note here, in case this might help another developer in the future.
*/
@Slf4j
public class ThresholdIndexingStrategyTransactionIdFetcher implements Runnable {
public class ThresholdIndexingStrategyTransactionIdFetcher implements Runnable, MeterBinder {

private final @NonNull BlockingDeque<@NonNull List<@NonNull Transaction>> queuedTransactions;

private final @NonNull SearchTrackingComponent searchTrackingComponent;
private final @NonNull ThresholdIndexingStrategyState state;
private final @NonNull ThresholdIndexingStrategyConfiguration configuration;

private boolean isRunning = false;

public ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull ThresholdIndexingStrategyState state) {
Expand Down Expand Up @@ -51,6 +55,7 @@ public ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingS
@Override
public void run() {
log.debug("Starting the ThresholdIndexingStrategyTransactionIdFetcher.");
isRunning = true;
try {
long currentTransactionId = state.getCurrentTransactionId();
long maxTransactionId = state.getMaxTransactionId();
Expand All @@ -73,6 +78,7 @@ public void run() {
"Trying to recover by signaling the end to the transaction merger(s).", e);
} finally {
try {
isRunning = false;
signalEnd();
} catch (InterruptedException e) {
log.error("The ThresholdIndexingStrategyTransactionIdFetcher has been interrupted while signaling the end to the transaction merger(s). " +
Expand Down Expand Up @@ -110,4 +116,8 @@ private void queueTransactions(@NonNull List<@NonNull Transaction> transactions)
}
}

@Override
public void bindTo(@NonNull MeterRegistry registry) {
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.transaction-fetcher.running", this, value -> value.isRunning ? 1 : 0);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package eu.xenit.alfresco.healthprocessor.indexing.threshold;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -10,16 +13,12 @@
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.StoreRef;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingDeque;
import java.util.stream.Collectors;

@Slf4j
@RequiredArgsConstructor
public class ThresholdIndexingStrategyTransactionIdMerger implements Runnable {
public class ThresholdIndexingStrategyTransactionIdMerger implements Runnable, MeterBinder {

private static final @NonNull Set<@NonNull StoreRef> WORKSPACE_AND_ARCHIVE_STORE_REFS = Set.of(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, StoreRef.STORE_REF_ARCHIVE_SPACESSTORE);

Expand All @@ -28,12 +27,26 @@ public class ThresholdIndexingStrategyTransactionIdMerger implements Runnable {
private final @NonNull ThresholdIndexingStrategyConfiguration configuration;
private final @NonNull NodeParameters nodeParameters = new NodeParameters();
private final @NonNull SearchTrackingComponent searchTrackingComponent;
private final @NonNull HashSet<NodeRef> bucket;
private final int index;

private @Getter boolean isRunning = false;

public ThresholdIndexingStrategyTransactionIdMerger(@NonNull ThresholdIndexingStrategyTransactionIdFetcher fetcher, @NonNull BlockingDeque<Set<NodeRef>> queuedNodes, @NonNull ThresholdIndexingStrategyConfiguration configuration, @NonNull SearchTrackingComponent searchTrackingComponent, int index) {
this.fetcher = fetcher;
this.queuedNodes = queuedNodes;
this.configuration = configuration;
this.searchTrackingComponent = searchTrackingComponent;
this.index = index;

this.bucket = new HashSet<>(configuration.getThreshold());
}

@Override
public void run() {
try {
log.debug("Starting ({}).", Thread.currentThread().getName());
HashSet<NodeRef> bucket = new HashSet<>(configuration.getThreshold());
log.debug("Starting merger ({}).", index);
isRunning = true;

List<Transaction> newTransactions;
while (!(newTransactions = fetcher.getNextTransactions()).isEmpty()) {
Expand All @@ -55,6 +68,8 @@ public void run() {
log.warn("({}) received an interrupt signal, which was unexpected. Trying to signal the end of the merger thread.", Thread.currentThread().getName(), e);
} finally {
try {
isRunning = false;
bucket.clear();
signalEnd();
} catch (InterruptedException e) {
log.error("({}) received an interrupt signal while trying to signal the end of the merger thread. " +
Expand Down Expand Up @@ -92,4 +107,10 @@ private void signalEnd() throws InterruptedException {
queuedNodes.putLast(Set.of());
}

@Override
public void bindTo(@NonNull MeterRegistry registry) {
registry.gauge(String.format("eu.xenit.alfresco.healthprocessor.indexing.threshold.transaction-merger-%s.running", index), this, value -> value.isRunning ? 1 : 0);
registry.gauge(String.format("eu.xenit.alfresco.healthprocessor.indexing.threshold.transaction-merger-%s.bucket-size", index), this, value -> value.bucket.size());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import eu.xenit.alfresco.healthprocessor.plugins.api.ToggleableHealthProcessorPlugin;
import eu.xenit.alfresco.healthprocessor.reporter.api.NodeHealthReport;
import eu.xenit.alfresco.healthprocessor.util.TransactionHelper;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.service.cmr.repository.NodeRef;

import org.alfresco.util.Pair;
import org.springframework.lang.Nullable;

import javax.annotation.Nonnull;
import java.util.HashMap;
Expand All @@ -24,7 +27,7 @@
import java.util.stream.Collectors;

@Slf4j
public class SolrUndersizedTransactionsHealthProcessorPlugin extends ToggleableHealthProcessorPlugin {
public class SolrUndersizedTransactionsHealthProcessorPlugin extends ToggleableHealthProcessorPlugin implements MeterBinder {

public static final @NonNull String SELECTED_INDEXER_STRATEGY_PROPERTY = "eu.xenit.alfresco.healthprocessor.indexing.strategy";

Expand All @@ -38,6 +41,15 @@ public SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int merg
@NonNull Properties properties,
@NonNull TransactionHelper transactionHelper,
@NonNull AbstractNodeDAOImpl nodeDAO) {
// Used for testing purposes.
this(enabled, mergerThreads, properties, transactionHelper, nodeDAO, null);
}

public SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int mergerThreads,
@NonNull Properties properties,
@NonNull TransactionHelper transactionHelper,
@NonNull AbstractNodeDAOImpl nodeDAO,
@Nullable MeterRegistry meterRegistry) {
super(enabled);
if (enabled) guaranteeThresholdIndexerIsUsed(properties);

Expand All @@ -47,6 +59,10 @@ public SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int merg

this.configuration = new HashMap<>(super.getConfiguration());
this.configuration.put("merger-threads", String.valueOf(mergerThreads));

if (meterRegistry != null) bindTo(meterRegistry);
else log.warn("The SolrUndersizedTransactionsHealthProcessorPlugin was not bound to a MeterRegistry. " +
"This means that the queue size will not be reported.");
}

@Nonnull
Expand Down Expand Up @@ -91,4 +107,9 @@ private static void guaranteeThresholdIndexerIsUsed(@NonNull Properties properti
"Please adjust the (%s) property.", expected, property, SELECTED_INDEXER_STRATEGY_PROPERTY));
}

@Override
public void bindTo(@NonNull MeterRegistry registry) {
registry.gauge("eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.merge-queue-size", queuedMergeRequests, AtomicInteger::get);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ThresholdIndexingStrategyTransactionIdMergerTest() throws InterruptedExce
}).when(searchTrackingComponent).getNodes(any(), any());

merger = new ThresholdIndexingStrategyTransactionIdMerger(fetcher, queuedNodes, configuration,
searchTrackingComponent);
searchTrackingComponent, 0);
}

@BeforeEach
Expand Down
Loading

0 comments on commit c4e8d1a

Please sign in to comment.