Skip to content

Commit

Permalink
Merge pull request #70 from xenit-eu/NYS2AWS-134-second-attempt-trans…
Browse files Browse the repository at this point in the history
…actions-merger

[NYS2AWS-134] second attempt transactions merger
  • Loading branch information
pvriel authored Feb 11, 2025
2 parents 5292927 + ad79a5c commit 49c4188
Show file tree
Hide file tree
Showing 27 changed files with 2,419 additions and 2 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ Version template:

# Alfresco Health Processor Changelog

## [0.7.0] - 2025-02-06

### Added
* Added the SolrUndersizedTransactionsHealthProcessorPlugin,
which merges the transactions before they are being indexed by Solr
to improve performance.
* Added the txn-aggregation indexing strategy, which multi-threads the retrieval
and batching process of transactions to improve performance.

## [0.6.1] - 2025-01-08

### Fixed
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ eu.xenit.alfresco.healthprocessor.indexing.last-txns.lookback-transactions=10000
eu.xenit.alfresco.healthprocessor.indexing.last-txns.txn-batch-size=5000
```

#### Multi-threaded txn-aggregation strategy

Strategy id: `txn-aggregation`

Creates a single thread that goes over all transactions in the range of `txn-id.start` and `txn-id.end` (-1 can be used to indicate all transactions).
Also creates `transactions-background-workers` threads that process these transactions in batches of `transactions-batch-size` transactions in parallel.

Transactions that already have at least `threshold` nodes linked to them are ignored.
The remaining nodes are added to the worker's cache / bucket.
Once the size of the cache / bucket reaches the `threshold` value, a copy of the bucket
is scheduled for delivery by the indexing strategy.
The original bucket is then cleared and can be filled with new nodes.

```properties
eu.xenit.alfresco.healthprocessor.indexing.strategy=txn-aggregation
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-background-workers=5
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-batch-size=1000
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.threshold=1000
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.start=-1
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.end=-1
```

### HealthProcessorPlugin implementations

#### Content Validation
Expand Down Expand Up @@ -255,6 +277,26 @@ All nodes for which an `PURGE` and `REINDEX` commands has been succesfully sent
> Currently, this case can not be detected automatically, but a node that does not become _HEALTHY_ in a subsequent
> Health-Processor run should be investigated why it is not being indexed.
#### Solr undersized transactions merger

Activation property: `eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.enabled=true`

<b>Note: this plugin can only be used in combination with the `txn-aggregation` indexing strategy.
If this indexing strategy is not selected while the plugin is enabled, the health processor platform will not boot.</b>

This plugin is used to merge transactions that are too small to be indexed by Solr in a performant manner.
<br>
This plugin marks all received nodeRefs as healthy, no matter what.
The actual merge operation is done in (a) separate thread(s).
The amount of threads can be configured; however, 1-2 threads should be sufficient for most setups
due to the fast merge operation.

Properties:
```properties
eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.enabled=true
eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.threads=1
```

### HealthReporter implementations

#### Alfred Telemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ public abstract class ToggleableHealthProcessorPlugin implements HealthProcessor
@Setter
private boolean enabled;

protected ToggleableHealthProcessorPlugin(boolean enabled) {
this.enabled = enabled;
}

public ToggleableHealthProcessorPlugin() {
this(false);
}

protected Logger getLogger() {
return log;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.Value;
import org.alfresco.service.cmr.repository.NodeRef;
Expand Down Expand Up @@ -145,6 +147,16 @@ public NodeHealthReport withoutUnpersistableData() {
return nodeHealthReport;
}

public static @NonNull Set<NodeHealthReport> of(@NonNull NodeHealthStatus status, @NonNull Collection<NodeRef> nodeRefs, @NonNull String... messages) {
return nodeRefs.stream()
.map(nodeRef -> new NodeHealthReport(status, nodeRef, messages))
.collect(Collectors.toSet());
}

public static @NonNull Set<NodeHealthReport> ofHealthy(@NonNull Collection<NodeRef> nodeRefs, @NonNull String... messages) {
return of(NodeHealthStatus.HEALTHY, nodeRefs, messages);
}

/**
* Classes that implement this interface and are added to a health report in the {@link #data(Class)} set are stored
* in the database and can be accessed after a cycle has finished in {@link HealthReporter#onCycleDone(List)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,24 @@
value="${eu.xenit.alfresco.healthprocessor.indexing.last-txns.txn-batch-size}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.ThresholdIndexingStrategyConfiguration"
class="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategyConfiguration"
autowire-candidate="false">
<constructor-arg name="transactionsBackgroundWorkers" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-background-workers}" />
<constructor-arg name="transactionsBatchSize" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-batch-size}" />
<constructor-arg name="threshold" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.threshold}" />
<constructor-arg name="minTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.start}" />
<constructor-arg name="maxTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.end}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration"
class="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfigurationFactoryBean">
<constructor-arg name="indexingStrategyKey" ref="eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy.IndexingStrategyKey" />
<constructor-arg name="configurations">
<list value-type="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration">
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration" />
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration" />
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.ThresholdIndexingStrategyConfiguration" />
</list>
</constructor-arg>
</bean>
Expand All @@ -53,5 +64,9 @@
<constructor-arg name="configuration" ref="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration" />
<constructor-arg name="trackingComponent" ref="eu.xenit.alfresco.healthprocessor.indexing.TrackingComponent" />
<constructor-arg name="attributeStore" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoAttributeStore" />
<constructor-arg name="searchTrackingComponent" ref="searchTrackingComponent" />
<constructor-arg name="nodeDAO" ref="nodeDAO" />
<constructor-arg name="dataSource" ref="dataSource" />
<constructor-arg name="meterRegistry" ref="meterRegistry" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@
value="${eu.xenit.alfresco.healthprocessor.plugin.content-validation.properties}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.plugins.solr.SolrUndersizedTransactionsHealthProcessorPlugin"
class="eu.xenit.alfresco.healthprocessor.plugins.solr.SolrUndersizedTransactionsHealthProcessorPlugin">
<constructor-arg name="enabled" value="${eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.enabled}" />
<constructor-arg name="mergerThreads" value="${eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.threads}" />
<constructor-arg name="properties" ref="global-properties" />
<constructor-arg name="transactionHelper" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoTransactionHelper" />
<constructor-arg name="nodeDAO" ref="nodeDAO" />
<constructor-arg name="meterRegistry" ref="meterRegistry" />
</bean>

<!-- Solr validation -->
<bean class="eu.xenit.alfresco.healthprocessor.plugins.solr.endpoint.SearchEndpointSelectorBeanPostProcessor" autowire="constructor">
<constructor-arg name="globalProperties" ref="global-properties" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ default CycleProgress getCycleProgress() {

enum IndexingStrategyKey {
TXNID("txn-id"),
LAST_TXNS("last-txns");
LAST_TXNS("last-txns"),
TXN_AGGREGATION("txn-aggregation");

@Getter
private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@

import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategyConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.util.AttributeStore;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.AllArgsConstructor;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.springframework.beans.factory.config.AbstractFactoryBean;

import javax.sql.DataSource;

@AllArgsConstructor
public final class IndexingStrategyFactoryBean extends AbstractFactoryBean<IndexingStrategy> {

private final IndexingConfiguration configuration;
private final TrackingComponent trackingComponent;
private final AttributeStore attributeStore;
private final SearchTrackingComponent searchTrackingComponent;
private final AbstractNodeDAOImpl nodeDAO;
private final DataSource dataSource;
private final MeterRegistry meterRegistry;

@Override
public Class<?> getObjectType() {
Expand All @@ -31,6 +42,9 @@ private IndexingStrategy createIndexingStrategy(IndexingStrategy.IndexingStrateg
return new TxnIdBasedIndexingStrategy((TxnIdIndexingConfiguration) configuration, trackingComponent, attributeStore);
case LAST_TXNS:
return new LastTxnsBasedIndexingStrategy((LastTxnsIndexingConfiguration) configuration, trackingComponent);
case TXN_AGGREGATION:
return new TransactionAggregationIndexingStrategy((TransactionAggregationIndexingStrategyConfiguration) configuration, nodeDAO,
searchTrackingComponent, dataSource, meterRegistry);
default:
throw new IllegalArgumentException("Unknown indexing strategy: "+ indexingStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package eu.xenit.alfresco.healthprocessor.indexing.txnaggregation;

import eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.NullCycleProgress;
import eu.xenit.alfresco.healthprocessor.indexing.SimpleCycleProgress;
import eu.xenit.alfresco.healthprocessor.reporter.api.CycleProgress;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.service.cmr.repository.NodeRef;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.lang.Nullable;

import javax.sql.DataSource;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

@Slf4j
public class TransactionAggregationIndexingStrategy implements IndexingStrategy, MeterBinder {

private final @NonNull TransactionAggregationIndexingStrategyConfiguration configuration;
private final @NonNull AbstractNodeDAOImpl nodeDAO;
private final @NonNull SearchTrackingComponent searchTrackingComponent;
private final @NonNull TransactionAggregationIndexingStrategyState state = new TransactionAggregationIndexingStrategyState();
private final @NonNull TransactionAggregationIndexingStrategyTransactionIdFetcher transactionIdFetcher;
private final @NonNull BlockingDeque<@NonNull Set<@NonNull NodeRef>> queuedNodes;
private final @NonNull TransactionAggregationIndexingStrategyTransactionIdMerger @NonNull [] transactionIdMergers;
private final @NonNull HashSet<@NonNull Thread> runningThreads;
private final @NonNull AtomicReference<@NonNull CycleProgress> cycleProgress = new AtomicReference<>(NullCycleProgress.getInstance());
private final @NonNull LongSupplier progressReporter = state::getCurrentTransactionId;

public TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource,
@Nullable MeterRegistry meterRegistry) {
this(configuration, nodeDAO, searchTrackingComponent, new JdbcTemplate(dataSource), meterRegistry);
}

TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate) {
this(configuration, nodeDAO, searchTrackingComponent, jdbcTemplate, null);
}

TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate,
@Nullable MeterRegistry meterRegistry) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));

this.configuration = configuration;
this.searchTrackingComponent = searchTrackingComponent;
this.nodeDAO = nodeDAO;

this.runningThreads = new HashSet<>(configuration.getTransactionsBackgroundWorkers() + 1);
this.transactionIdFetcher = new TransactionAggregationIndexingStrategyTransactionIdFetcher(configuration, jdbcTemplate, state);
this.queuedNodes = new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers());

this.transactionIdMergers = new TransactionAggregationIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++)
this.transactionIdMergers[i] = new TransactionAggregationIndexingStrategyTransactionIdMerger(transactionIdFetcher, queuedNodes, configuration, searchTrackingComponent);

if (meterRegistry != null) bindTo(meterRegistry);
}

@Override
public void onStart() {
state.setCurrentTransactionId(Math.max(configuration.getMinTransactionId(), nodeDAO.getMinTxnId()));
state.setMaxTransactionId(Math.min(configuration.getMaxTransactionId() >= 0? configuration.getMaxTransactionId() : Long.MAX_VALUE, searchTrackingComponent.getMaxTxnId()));
cycleProgress.set(new SimpleCycleProgress(state.getCurrentTransactionId(), state.getMaxTransactionId(), progressReporter));
log.debug("Starting the TransactionAggregationIndexingStrategy with currentTransactionId ({}) and maxTransactionId ({}).", state.getCurrentTransactionId(), state.getMaxTransactionId());

Thread fetcherThread = new Thread(transactionIdFetcher);
fetcherThread.setName("TransactionAggregationIndexingStrategyTransactionIdFetcher");
runningThreads.add(fetcherThread);
for (int i = 0; i < transactionIdMergers.length; i++) {
TransactionAggregationIndexingStrategyTransactionIdMerger merger = transactionIdMergers[i];
Thread mergerThread = new Thread(merger);
mergerThread.setName(String.format("TransactionAggregationIndexingStrategyTransactionIdMerger-%d", i));
runningThreads.add(new Thread(merger));
}
for (Thread thread : runningThreads) thread.start();
state.setRunningTransactionMergers(transactionIdMergers.length);

log.debug("Started ({}) background thread(s), of which ({}) transaction merger(s).", runningThreads.size(), state.getRunningTransactionMergers());
}

@Override
@SneakyThrows(InterruptedException.class)
public @NonNull Set<@NonNull NodeRef> getNextNodeIds(int amount) {
Set<NodeRef> returnValue = Set.of();
while (state.getRunningTransactionMergers() > 0) {
returnValue = queuedNodes.takeFirst();
if (returnValue.isEmpty()) {
state.decrementRunningTransactionMergers();
log.debug("Received an empty batch of NodeRefs, which indicate a halted transaction merger. Remaining transaction mergers is ({}).", state.getRunningTransactionMergers());
} else break;
}

return returnValue;
}

@Override
public void onStop() {
log.debug("Stopping the TransactionAggregationIndexingStrategy.");
for (Thread thread : runningThreads) thread.interrupt();
runningThreads.clear();

state.setCurrentTransactionId(-1);
// I'm leaving maxTransactionId as-is. Gives a nice indicator where the previous iteration finished.
cycleProgress.set(NullCycleProgress.getInstance());
}

@Override
public @NonNull Map<@NonNull String, @NonNull String> getState() {
return state.getMapRepresentation();
}

@Override
public @NonNull CycleProgress getCycleProgress() {
return cycleProgress.get();
}

@Override
public void bindTo(MeterRegistry registry) {
state.bindTo(registry);
transactionIdFetcher.bindTo(registry);

registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.queued-nodes", queuedNodes, BlockingDeque::size);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.running-background-threads", runningThreads, value -> value.stream().filter(Thread::isAlive).count());
}
}
Loading

0 comments on commit 49c4188

Please sign in to comment.