Skip to content

Commit

Permalink
IGNITE-23138 intermediate results
Browse files Browse the repository at this point in the history
  • Loading branch information
superminkfan committed Nov 3, 2024
1 parent 03bbc2d commit 7121b19
Showing 1 changed file with 51 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.ignite.internal.processors.cache.transactions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
Expand All @@ -26,7 +27,6 @@
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
Expand All @@ -36,13 +36,9 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
Expand All @@ -62,9 +58,6 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
/** Client flag. */
private boolean client;

/** Near cache flag. */
private boolean nearCache;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
Expand All @@ -86,8 +79,6 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {

cfg.setCommunicationSpi(commSpi);

cfg.setCacheConfiguration(getCacheConfiguration(DEFAULT_CACHE_NAME));

if (client) {
cfg.setConsistentId("Client");

Expand All @@ -98,8 +89,8 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
}

/** */
protected CacheConfiguration<?, ?> getCacheConfiguration(String name) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name)
protected CacheConfiguration<?, ?> getCacheConfiguration(boolean nearCache) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setAffinity(new RendezvousAffinityFunction(false, 16))
Expand Down Expand Up @@ -134,7 +125,7 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, false);
}

/**
Expand All @@ -145,9 +136,7 @@ public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Except
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, true);
}

/**
Expand All @@ -156,7 +145,7 @@ public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Ex
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, false);
}

/**
Expand All @@ -165,9 +154,7 @@ public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exceptio
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, true);
}

/**
Expand All @@ -176,7 +163,7 @@ public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exce
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, false);
}

/**
Expand All @@ -185,9 +172,7 @@ public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exceptio
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, true);
}

/**
Expand All @@ -196,7 +181,7 @@ public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exce
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, false);
}

/**
Expand All @@ -205,9 +190,7 @@ public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Excepti
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, true);
}

/** Tests metric correct results while tx collisions occured.
Expand All @@ -216,120 +199,78 @@ public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exc
* @param isolation Isolation level.
* @throws Exception If failed.
*/
private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation, boolean nearCashe)
throws Exception {
Ignite ig = startGridsMultiThreaded(3);

int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5;

CountDownLatch txLatch = new CountDownLatch(contCnt);

ig.cluster().state(ClusterState.ACTIVE);

client = true;

Ignite cl = startGrid();

IgniteTransactions cliTxMgr = cl.transactions();

IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);

IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME);

final Integer keyId = primaryKey(cache);
IgniteCache<Integer, Integer> cache = (IgniteCache<Integer, Integer>)
cl.getOrCreateCache(getCacheConfiguration(nearCashe));

CountDownLatch blockOnce = new CountDownLatch(1);

for (Ignite ig0 : G.allGrids()) {
if (ig0.configuration().isClientMode())
continue;

TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();

commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) {
blockOnce.countDown();

return true;
}

return false;
}
});
}
IgniteCache<Integer, Integer> cache0 = (IgniteCache<Integer, Integer>)
ig.getOrCreateCache(getCacheConfiguration(nearCashe));

IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
cache0.put(keyId, 0);
tx.commit();
}
});
final Integer keyId = primaryKey(cache0);

blockOnce.await();
IgniteTransactions transactions = cl.transactions();

GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();

for (int i = 0; i < contCnt; ++i) {
IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
cache0.put(keyId, 0);
AtomicBoolean doTest = new AtomicBoolean(true);

tx.commit();
for (int i = 0; i < contCnt; ++i) {
IgniteInternalFuture f0 = GridTestUtils.runMultiThreadedAsync(() -> {
while (doTest.get()) {
try (Transaction tx = transactions.txStart(concurrency, isolation)) {
cache.put(keyId, 0);

txLatch.countDown();
tx.commit();
}
}
});
}, 1, "threadName");

finishFut.add(f0);
}

finishFut.markInitialized();

for (Ignite ig0 : G.allGrids()) {
TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();

if (ig0.configuration().isClientMode())
continue;

commSpi0.stopBlock();
}

IgniteTxManager srvTxMgr = ((IgniteEx)ig).context().cache().context().tm();

assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo");
}
catch (IgniteCheckedException e) {
fail(e.toString());
}

CacheMetrics metrics = ig.cache(DEFAULT_CACHE_NAME).localMetrics();
try {
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo");
}
catch (IgniteCheckedException e) {
fail(e.toString());
}

String coll1 = metrics.getTxKeyCollisions();
CacheMetrics metrics = ig.cache(DEFAULT_CACHE_NAME).localMetrics();

if (!coll1.isEmpty()) {
String coll2 = metrics.getTxKeyCollisions();
String coll = metrics.getTxKeyCollisions();

// check idempotent
assertEquals(coll1, coll2);
if (!coll.isEmpty()) {
assertTrue(coll.contains("queueSize"));

assertTrue(coll1.contains("queueSize"));
return true;
}

return true;
}
else
return false;
}
}, 10_000));

f.get();

finishFut.get();
}
}, getTestTimeout()));
}
finally {
doTest.set(false);

txLatch.await();
finishFut.get();
}
}
}

0 comments on commit 7121b19

Please sign in to comment.