Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NYS2AWS-134] second attempt transactions merger #70

Merged
merged 21 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ Version template:

# Alfresco Health Processor Changelog

## [0.7.0] - 2025-02-06

### Added
* Added the SolrUndersizedTransactionsHealthProcessorPlugin,
which merges the transactions before they are being indexed by Solr
to improve performance.
* Added the txn-aggregation indexing strategy, which multi-threads the retrieval
and batching process of transactions to improve performance.

## [0.6.1] - 2025-01-08

### Fixed
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ eu.xenit.alfresco.healthprocessor.indexing.last-txns.lookback-transactions=10000
eu.xenit.alfresco.healthprocessor.indexing.last-txns.txn-batch-size=5000
```

#### Multi-threaded txn-aggregation strategy

Strategy id: `txn-aggregation`

Creates a single thread that goes over all transactions in the range of `txn-id.start` and `txn-id.end` (-1 can be used to indicate all transactions).
Also creates `transactions-background-workers` threads that process these transactions in batches of `transactions-batch-size` transactions in parallel.

Transactions that already have at least `threshold` nodes linked to them are ignored.
The remaining nodes are added to the worker's cache / bucket.
Once the size of the cache / bucket reaches the `threshold` value, a copy of the bucket
is scheduled for delivery by the indexing strategy.
The original bucket is then cleared and can be filled with new nodes.

```properties
eu.xenit.alfresco.healthprocessor.indexing.strategy=txn-aggregation
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-background-workers=5
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-batch-size=1000
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.threshold=1000
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.start=-1
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.end=-1
```

### HealthProcessorPlugin implementations

#### Content Validation
Expand Down Expand Up @@ -255,6 +277,26 @@ All nodes for which an `PURGE` and `REINDEX` commands has been succesfully sent
> Currently, this case can not be detected automatically, but a node that does not become _HEALTHY_ in a subsequent
> Health-Processor run should be investigated why it is not being indexed.

#### Solr undersized transactions merger

Activation property: `eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.enabled=true`

<b>Note: this plugin can only be used in combination with the `txn-aggregation` indexing strategy.
If this indexing strategy is not selected while the plugin is enabled, the health processor platform will not boot.</b>

This plugin is used to merge transactions that are too small to be indexed by Solr in a performant manner.
<br>
This plugin marks all received nodeRefs as healthy, no matter what.
The actual merge operation is done in (a) separate thread(s).
The amount of threads can be configured; however, 1-2 threads should be sufficient for most setups
due to the fast merge operation.

Properties:
```properties
eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.enabled=true
eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.threads=1
```

### HealthReporter implementations

#### Alfred Telemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ public abstract class ToggleableHealthProcessorPlugin implements HealthProcessor
@Setter
private boolean enabled;

protected ToggleableHealthProcessorPlugin(boolean enabled) {
this.enabled = enabled;
}

public ToggleableHealthProcessorPlugin() {
this(false);
}

protected Logger getLogger() {
return log;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.Value;
import org.alfresco.service.cmr.repository.NodeRef;
Expand Down Expand Up @@ -145,6 +147,16 @@ public NodeHealthReport withoutUnpersistableData() {
return nodeHealthReport;
}

public static @NonNull Set<NodeHealthReport> of(@NonNull NodeHealthStatus status, @NonNull Collection<NodeRef> nodeRefs, @NonNull String... messages) {
return nodeRefs.stream()
.map(nodeRef -> new NodeHealthReport(status, nodeRef, messages))
.collect(Collectors.toSet());
}

public static @NonNull Set<NodeHealthReport> ofHealthy(@NonNull Collection<NodeRef> nodeRefs, @NonNull String... messages) {
return of(NodeHealthStatus.HEALTHY, nodeRefs, messages);
}

/**
* Classes that implement this interface and are added to a health report in the {@link #data(Class)} set are stored
* in the database and can be accessed after a cycle has finished in {@link HealthReporter#onCycleDone(List)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,24 @@
value="${eu.xenit.alfresco.healthprocessor.indexing.last-txns.txn-batch-size}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.ThresholdIndexingStrategyConfiguration"
class="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategyConfiguration"
autowire-candidate="false">
<constructor-arg name="transactionsBackgroundWorkers" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-background-workers}" />
<constructor-arg name="transactionsBatchSize" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-batch-size}" />
<constructor-arg name="threshold" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.threshold}" />
<constructor-arg name="minTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.start}" />
<constructor-arg name="maxTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.end}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration"
class="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfigurationFactoryBean">
<constructor-arg name="indexingStrategyKey" ref="eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy.IndexingStrategyKey" />
<constructor-arg name="configurations">
<list value-type="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration">
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration" />
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration" />
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.ThresholdIndexingStrategyConfiguration" />
</list>
</constructor-arg>
</bean>
Expand All @@ -53,5 +64,9 @@
<constructor-arg name="configuration" ref="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration" />
<constructor-arg name="trackingComponent" ref="eu.xenit.alfresco.healthprocessor.indexing.TrackingComponent" />
<constructor-arg name="attributeStore" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoAttributeStore" />
<constructor-arg name="searchTrackingComponent" ref="searchTrackingComponent" />
<constructor-arg name="nodeDAO" ref="nodeDAO" />
<constructor-arg name="dataSource" ref="dataSource" />
<constructor-arg name="meterRegistry" ref="meterRegistry" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@
value="${eu.xenit.alfresco.healthprocessor.plugin.content-validation.properties}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.plugins.solr.SolrUndersizedTransactionsHealthProcessorPlugin"
class="eu.xenit.alfresco.healthprocessor.plugins.solr.SolrUndersizedTransactionsHealthProcessorPlugin">
<constructor-arg name="enabled" value="${eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.enabled}" />
<constructor-arg name="mergerThreads" value="${eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.threads}" />
<constructor-arg name="properties" ref="global-properties" />
<constructor-arg name="transactionHelper" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoTransactionHelper" />
<constructor-arg name="nodeDAO" ref="nodeDAO" />
<constructor-arg name="meterRegistry" ref="meterRegistry" />
</bean>

<!-- Solr validation -->
<bean class="eu.xenit.alfresco.healthprocessor.plugins.solr.endpoint.SearchEndpointSelectorBeanPostProcessor" autowire="constructor">
<constructor-arg name="globalProperties" ref="global-properties" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ default CycleProgress getCycleProgress() {

enum IndexingStrategyKey {
TXNID("txn-id"),
LAST_TXNS("last-txns");
LAST_TXNS("last-txns"),
TXN_AGGREGATION("txn-aggregation");

@Getter
private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@

import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategyConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.util.AttributeStore;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.AllArgsConstructor;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.springframework.beans.factory.config.AbstractFactoryBean;

import javax.sql.DataSource;

@AllArgsConstructor
public final class IndexingStrategyFactoryBean extends AbstractFactoryBean<IndexingStrategy> {

private final IndexingConfiguration configuration;
private final TrackingComponent trackingComponent;
private final AttributeStore attributeStore;
private final SearchTrackingComponent searchTrackingComponent;
private final AbstractNodeDAOImpl nodeDAO;
private final DataSource dataSource;
private final MeterRegistry meterRegistry;

@Override
public Class<?> getObjectType() {
Expand All @@ -31,6 +42,9 @@ private IndexingStrategy createIndexingStrategy(IndexingStrategy.IndexingStrateg
return new TxnIdBasedIndexingStrategy((TxnIdIndexingConfiguration) configuration, trackingComponent, attributeStore);
case LAST_TXNS:
return new LastTxnsBasedIndexingStrategy((LastTxnsIndexingConfiguration) configuration, trackingComponent);
case TXN_AGGREGATION:
return new TransactionAggregationIndexingStrategy((TransactionAggregationIndexingStrategyConfiguration) configuration, nodeDAO,
searchTrackingComponent, dataSource, meterRegistry);
default:
throw new IllegalArgumentException("Unknown indexing strategy: "+ indexingStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package eu.xenit.alfresco.healthprocessor.indexing.txnaggregation;

import eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.NullCycleProgress;
import eu.xenit.alfresco.healthprocessor.indexing.SimpleCycleProgress;
import eu.xenit.alfresco.healthprocessor.reporter.api.CycleProgress;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.service.cmr.repository.NodeRef;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.lang.Nullable;

import javax.sql.DataSource;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

@Slf4j
public class TransactionAggregationIndexingStrategy implements IndexingStrategy, MeterBinder {

private final @NonNull TransactionAggregationIndexingStrategyConfiguration configuration;
private final @NonNull AbstractNodeDAOImpl nodeDAO;
private final @NonNull SearchTrackingComponent searchTrackingComponent;
private final @NonNull TransactionAggregationIndexingStrategyState state = new TransactionAggregationIndexingStrategyState();
private final @NonNull TransactionAggregationIndexingStrategyTransactionIdFetcher transactionIdFetcher;
private final @NonNull BlockingDeque<@NonNull Set<@NonNull NodeRef>> queuedNodes;
private final @NonNull TransactionAggregationIndexingStrategyTransactionIdMerger @NonNull [] transactionIdMergers;
private final @NonNull HashSet<@NonNull Thread> runningThreads;
private final @NonNull AtomicReference<@NonNull CycleProgress> cycleProgress = new AtomicReference<>(NullCycleProgress.getInstance());
private final @NonNull LongSupplier progressReporter = state::getCurrentTransactionId;

public TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource,
@Nullable MeterRegistry meterRegistry) {
this(configuration, nodeDAO, searchTrackingComponent, new JdbcTemplate(dataSource), meterRegistry);
}

TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate) {
this(configuration, nodeDAO, searchTrackingComponent, jdbcTemplate, null);
}

TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate,
@Nullable MeterRegistry meterRegistry) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));

this.configuration = configuration;
this.searchTrackingComponent = searchTrackingComponent;
this.nodeDAO = nodeDAO;

this.runningThreads = new HashSet<>(configuration.getTransactionsBackgroundWorkers() + 1);
this.transactionIdFetcher = new TransactionAggregationIndexingStrategyTransactionIdFetcher(configuration, jdbcTemplate, state);
this.queuedNodes = new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers());

this.transactionIdMergers = new TransactionAggregationIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++)
this.transactionIdMergers[i] = new TransactionAggregationIndexingStrategyTransactionIdMerger(transactionIdFetcher, queuedNodes, configuration, searchTrackingComponent);

if (meterRegistry != null) bindTo(meterRegistry);
}

@Override
public void onStart() {
state.setCurrentTransactionId(Math.max(configuration.getMinTransactionId(), nodeDAO.getMinTxnId()));
state.setMaxTransactionId(Math.min(configuration.getMaxTransactionId() >= 0? configuration.getMaxTransactionId() : Long.MAX_VALUE, searchTrackingComponent.getMaxTxnId()));
cycleProgress.set(new SimpleCycleProgress(state.getCurrentTransactionId(), state.getMaxTransactionId(), progressReporter));
log.debug("Starting the TransactionAggregationIndexingStrategy with currentTransactionId ({}) and maxTransactionId ({}).", state.getCurrentTransactionId(), state.getMaxTransactionId());

Thread fetcherThread = new Thread(transactionIdFetcher);
fetcherThread.setName("TransactionAggregationIndexingStrategyTransactionIdFetcher");
runningThreads.add(fetcherThread);
for (int i = 0; i < transactionIdMergers.length; i++) {
TransactionAggregationIndexingStrategyTransactionIdMerger merger = transactionIdMergers[i];
Thread mergerThread = new Thread(merger);
mergerThread.setName(String.format("TransactionAggregationIndexingStrategyTransactionIdMerger-%d", i));
runningThreads.add(new Thread(merger));
}
for (Thread thread : runningThreads) thread.start();
state.setRunningTransactionMergers(transactionIdMergers.length);

log.debug("Started ({}) background thread(s), of which ({}) transaction merger(s).", runningThreads.size(), state.getRunningTransactionMergers());
}

@Override
@SneakyThrows(InterruptedException.class)
public @NonNull Set<@NonNull NodeRef> getNextNodeIds(int amount) {
Set<NodeRef> returnValue = Set.of();
while (state.getRunningTransactionMergers() > 0) {
returnValue = queuedNodes.takeFirst();
if (returnValue.isEmpty()) {
state.decrementRunningTransactionMergers();
log.debug("Received an empty batch of NodeRefs, which indicate a halted transaction merger. Remaining transaction mergers is ({}).", state.getRunningTransactionMergers());
} else break;
}

return returnValue;
}

@Override
public void onStop() {
log.debug("Stopping the TransactionAggregationIndexingStrategy.");
for (Thread thread : runningThreads) thread.interrupt();
runningThreads.clear();

state.setCurrentTransactionId(-1);
// I'm leaving maxTransactionId as-is. Gives a nice indicator where the previous iteration finished.
cycleProgress.set(NullCycleProgress.getInstance());
}

@Override
public @NonNull Map<@NonNull String, @NonNull String> getState() {
return state.getMapRepresentation();
}

@Override
public @NonNull CycleProgress getCycleProgress() {
return cycleProgress.get();
}

@Override
public void bindTo(MeterRegistry registry) {
state.bindTo(registry);
transactionIdFetcher.bindTo(registry);

registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.queued-nodes", queuedNodes, BlockingDeque::size);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.running-background-threads", runningThreads, value -> value.stream().filter(Thread::isAlive).count());
}
}
Loading