Skip to content

Commit

Permalink
IGNITE-22733 Introduce TransactionChanges and abstract transactional …
Browse files Browse the repository at this point in the history
…test
  • Loading branch information
nizhikov committed Nov 21, 2024
1 parent ffa6850 commit 00fe66b
Show file tree
Hide file tree
Showing 12 changed files with 926 additions and 731 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +41,7 @@
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
Expand All @@ -61,7 +61,6 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -346,13 +345,13 @@ public static Collection<QueryTxEntry> transactionChanges(
* @return First, set of object changed in transaction, second, list of transaction data in required format.
* @param <R> Required type.
*/
public <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> transactionChanges(
public <R> TransactionChanges<R> transactionChanges(
int cacheId,
int[] parts,
Function<CacheDataRow, R> mapper
) {
if (F.isEmpty(qryTxEntries))
return F.t(Collections.emptySet(), Collections.emptyList());
return TransactionChanges.empty();

// Expecting parts are sorted or almost sorted and amount of transaction entries are relatively small.
if (parts != null && !F.isSorted(parts))
Expand Down Expand Up @@ -386,7 +385,7 @@ public <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> transactionChanges(
}
}

return F.t(changedKeys, newAndUpdatedRows);
return new TransactionChanges<>(changedKeys, newAndUpdatedRows);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
Expand All @@ -39,9 +38,9 @@
import org.apache.ignite.internal.cache.query.index.sorted.inline.io.InlineIO;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.TransformRangeIterable;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
Expand All @@ -50,7 +49,6 @@
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -81,7 +79,7 @@ public class IndexScan<Row> extends AbstractCacheColumnsScan<Row> {
* First, set of keys changed (inserted, updated or removed) inside transaction: must be skiped during index scan.
* Second, list of rows inserted or updated inside transaction: must be mixed with the scan results.
*/
private final IgniteBiTuple<Set<KeyCacheObject>, List<IndexRow>> txChanges;
private final TransactionChanges<IndexRow> txChanges;

/**
* @param ectx Execution context.
Expand Down Expand Up @@ -125,7 +123,7 @@ public IndexScan(
r -> new IndexRowImpl(rowHnd, r)
);

txChanges.get2().sort(this::compare);
txChanges.newAndUpdatedEntries().sort(this::compare);
}
else
txChanges = null;
Expand Down Expand Up @@ -257,7 +255,7 @@ protected IndexQueryContext indexQueryContext() {

InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();

InlineIndexRowFactory rowFactory = (isInlineScan() && (txChanges == null || F.isEmpty(txChanges.get1()))) ?
InlineIndexRowFactory rowFactory = (isInlineScan() && (txChanges == null || F.isEmpty(txChanges.changedKeys()))) ?
new InlineIndexRowFactory(rowHnd.inlineIndexKeyTypes().toArray(new InlineIndexKeyType[0]), rowHnd) : null;

BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = isInlineScan() ? null : createNotExpiredRowFilter();
Expand Down Expand Up @@ -429,9 +427,9 @@ protected TxAwareTreeIndexWrapper(TreeIndex<IndexRow> delegate) {
new GridCursor[]{
// This call will change `txChanges.get1()` content.
// Removing found key from set more efficient so we break some rules here.
new KeyFilteringCursor<>(idxCursor, txChanges.get1(), r -> r.cacheDataRow().key()),
new KeyFilteringCursor<>(idxCursor, txChanges.changedKeys(), r -> r.cacheDataRow().key()),
new SortedListRangeCursor<>(
IndexScan.this::compare, txChanges.get2(), lower, upper, lowerInclude, upperInclude)
IndexScan.this::compare, txChanges.newAndUpdatedEntries(), lower, upper, lowerInclude, upperInclude)
},
idx.indexDefinition()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,20 @@
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;

/** */
Expand Down Expand Up @@ -70,7 +67,7 @@ private class IteratorImpl extends GridIteratorAdapter<Row> {
* First, set of keys changed (inserted, updated or removed) inside transaction: must be skiped during index scan.
* Second, list of rows inserted or updated inside transaction: must be mixed with the scan results.
*/
private IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges;
private TransactionChanges<CacheDataRow> txChanges;

/** */
private Iterator<CacheDataRow> txIter = Collections.emptyIterator();
Expand Down Expand Up @@ -139,10 +136,10 @@ private void advance() throws IgniteCheckedException {
if (txChanges != null) {
// This call will change `txChanges.get1()` content.
// Removing found key from set more efficient so we break some rules here.
if (!F.isEmpty(txChanges.get1()))
cur = new KeyFilteringCursor<>(cur, txChanges.get1(), CacheSearchRow::key);
if (!F.isEmpty(txChanges.changedKeys()))
cur = new KeyFilteringCursor<>(cur, txChanges.changedKeys(), CacheSearchRow::key);

txIter = F.iterator0(txChanges.get2(), true, e -> e.key().partition() == part.id());
txIter = F.iterator0(txChanges.newAndUpdatedEntries(), true, e -> e.key().partition() == part.id());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.IndexFirstLastScan;
import org.apache.ignite.internal.processors.query.calcite.exec.IndexScan;
Expand All @@ -58,7 +59,6 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -175,18 +175,18 @@ public Index queryIndex() {
long cnt = 0;

if (!F.isEmpty(ectx.getQryTxEntries())) {
IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = ectx.transactionChanges(
TransactionChanges<CacheDataRow> txChanges = ectx.transactionChanges(
iidx.indexDefinition().cacheInfo().cacheId(),
locParts,
Function.identity()
);

if (!txChanges.get1().isEmpty()) {
if (!txChanges.changedKeys().isEmpty()) {
// This call will change `txChanges.get1()` content.
// Removing found key from set more efficient so we break some rules here.
rowFilter = transactionAwareCountRowFilter(rowFilter, txChanges.get1());
rowFilter = transactionAwareCountRowFilter(rowFilter, txChanges.changedKeys());

cnt = countTransactionRows(notNull, iidx, txChanges.get2());
cnt = countTransactionRows(notNull, iidx, txChanges.newAndUpdatedEntries());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.testframework.SupplierX;
import org.apache.ignite.transactions.Transaction;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -212,22 +213,6 @@ public String atomicity() {
return "atomicity=" + (sqlTxMode == SqlTransactionMode.NONE ? CacheAtomicityMode.ATOMIC : CacheAtomicityMode.TRANSACTIONAL);
}

/** */
public interface SupplierX<T> {
/** */
T getx() throws Exception;

/** */
default T get() {
try {
return getx();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

/** */
public enum SqlTransactionMode {
/** All put, remove and SQL dml will be executed inside transaction. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteSqlFunctions;
import org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.SupplierX;
import org.junit.Test;

import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.SupplierX;
import org.junit.Test;

/**
Expand Down
Loading

0 comments on commit 00fe66b

Please sign in to comment.