Skip to content

Commit

Permalink
IGNITE-23138 Fix flaky TxWithKeyContentionSelfTest (Cache 12 group) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
superminkfan authored Nov 18, 2024
1 parent 05f4874 commit 056189c
Showing 1 changed file with 60 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,31 @@

package org.apache.ignite.internal.processors.cache.transactions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
Expand All @@ -59,21 +51,14 @@

/** Tests tx key contention detection functional. */
public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
/** Client flag. */
private boolean client;

/** Near cache flag. */
private boolean nearCache;
/** */
@Rule
public TestName testName = new TestName();

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);

cfg.setConsistentId("NODE_" + name.substring(name.length() - 1));

if (client)
cfg.setClientMode(true);

cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
Expand All @@ -82,24 +67,12 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
)
);

TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();

cfg.setCommunicationSpi(commSpi);

cfg.setCacheConfiguration(getCacheConfiguration(DEFAULT_CACHE_NAME));

if (client) {
cfg.setConsistentId("Client");

cfg.setClientMode(client);
}

return cfg;
}

/** */
protected CacheConfiguration<?, ?> getCacheConfiguration(String name) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name)
private CacheConfiguration<Integer, Integer> cacheConfiguration(boolean nearCache) {
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setAffinity(new RendezvousAffinityFunction(false, 16))
Expand Down Expand Up @@ -134,7 +107,7 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, false);
}

/**
Expand All @@ -145,9 +118,7 @@ public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Except
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, true);
}

/**
Expand All @@ -156,7 +127,7 @@ public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Ex
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, false);
}

/**
Expand All @@ -165,9 +136,7 @@ public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exceptio
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, true);
}

/**
Expand All @@ -176,7 +145,7 @@ public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exce
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, false);
}

/**
Expand All @@ -185,9 +154,7 @@ public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exceptio
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, true);
}

/**
Expand All @@ -196,7 +163,7 @@ public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exce
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, false);
}

/**
Expand All @@ -205,9 +172,7 @@ public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Excepti
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, true);
}

/** Tests metric correct results while tx collisions occured.
Expand All @@ -216,120 +181,67 @@ public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exc
* @param isolation Isolation level.
* @throws Exception If failed.
*/
private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation, boolean nearCache)
throws Exception {
Ignite ig = startGridsMultiThreaded(3);

int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5;

CountDownLatch txLatch = new CountDownLatch(contCnt);

ig.cluster().state(ClusterState.ACTIVE);

client = true;

Ignite cl = startGrid();

IgniteTransactions cliTxMgr = cl.transactions();
int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 2;

IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
Ignite cl = startClientGrid();

IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Integer> clientCache = cl.createCache(cacheConfiguration(nearCache));

final Integer keyId = primaryKey(cache);
final Integer keyId = primaryKey(ig.cache(DEFAULT_CACHE_NAME));

CountDownLatch blockOnce = new CountDownLatch(1);
IgniteTransactions transactions = cl.transactions();

for (Ignite ig0 : G.allGrids()) {
if (ig0.configuration().isClientMode())
continue;
assertFalse(checkMetrics(ig));

TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
AtomicBoolean doTest = new AtomicBoolean(true);

commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) {
blockOnce.countDown();
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(
() -> {
while (doTest.get()) {
try (Transaction tx = transactions.txStart(concurrency, isolation)) {
clientCache.put(keyId, 0);

return true;
tx.commit();
}

return false;
}
});
}

IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
cache0.put(keyId, 0);
tx.commit();
}
});

blockOnce.await();

GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();

for (int i = 0; i < contCnt; ++i) {
IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
cache0.put(keyId, 0);

tx.commit();

txLatch.countDown();
}
});

finishFut.add(f0);
},
contCnt,
"txThread-" + testName.getMethodName());

try {
assertTrue(GridTestUtils.waitForCondition(
() -> checkMetrics(ig),
getTestTimeout()));
}

finishFut.markInitialized();

for (Ignite ig0 : G.allGrids()) {
TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();

if (ig0.configuration().isClientMode())
continue;

commSpi0.stopBlock();
finally {
doTest.set(false);
fut.get(getTestTimeout());
}
}

/**
* Checks if the transaction collision metrics contain the string "queueSize" for the given Ignite instance.
*
* @param ig Ignite instance.
* @return {@code true} if the metrics contain "queueSize"; otherwise {@code false}.
*/
private static boolean checkMetrics(Ignite ig) {
IgniteTxManager srvTxMgr = ((IgniteEx)ig).context().cache().context().tm();

assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo");
}
catch (IgniteCheckedException e) {
fail(e.toString());
}

CacheMetrics metrics = ig.cache(DEFAULT_CACHE_NAME).localMetrics();

String coll1 = metrics.getTxKeyCollisions();

if (!coll1.isEmpty()) {
String coll2 = metrics.getTxKeyCollisions();

// check idempotent
assertEquals(coll1, coll2);

assertTrue(coll1.contains("queueSize"));

return true;
}
else
return false;
}
}, 10_000));

f.get();

finishFut.get();
try {
U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo");
}
catch (IgniteCheckedException e) {
fail(e.toString());
}

txLatch.await();
return ig.cache(DEFAULT_CACHE_NAME)
.localMetrics()
.getTxKeyCollisions()
.contains("queueSize");
}
}

0 comments on commit 056189c

Please sign in to comment.