Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23138 Fix TxWithKeyContentionSelfTest (Cache 12 group) #11512

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,31 @@

package org.apache.ignite.internal.processors.cache.transactions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
Expand All @@ -59,21 +51,14 @@

/** Tests tx key contention detection functional. */
public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
/** Client flag. */
private boolean client;

/** Near cache flag. */
private boolean nearCache;
/** */
@Rule
public TestName testName = new TestName();

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);

cfg.setConsistentId("NODE_" + name.substring(name.length() - 1));

if (client)
cfg.setClientMode(true);

cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
Expand All @@ -82,24 +67,12 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
)
);

TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();

cfg.setCommunicationSpi(commSpi);

cfg.setCacheConfiguration(getCacheConfiguration(DEFAULT_CACHE_NAME));

if (client) {
cfg.setConsistentId("Client");

cfg.setClientMode(client);
}

return cfg;
}

/** */
protected CacheConfiguration<?, ?> getCacheConfiguration(String name) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name)
private CacheConfiguration<Integer, Integer> cacheConfiguration(boolean nearCache) {
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setAffinity(new RendezvousAffinityFunction(false, 16))
Expand Down Expand Up @@ -134,7 +107,7 @@ public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, false);
}

/**
Expand All @@ -145,9 +118,7 @@ public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Except
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, true);
}

/**
Expand All @@ -156,7 +127,7 @@ public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Ex
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, false);
}

/**
Expand All @@ -165,9 +136,7 @@ public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exceptio
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, true);
}

/**
Expand All @@ -176,7 +145,7 @@ public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exce
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, false);
}

/**
Expand All @@ -185,9 +154,7 @@ public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exceptio
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, true);
}

/**
Expand All @@ -196,7 +163,7 @@ public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exce
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Exception {
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, false);
}

/**
Expand All @@ -205,9 +172,7 @@ public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Excepti
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
nearCache = true;

runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, true);
}

/** Tests metric correct results while tx collisions occured.
Expand All @@ -216,120 +181,67 @@ public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exc
* @param isolation Isolation level.
* @throws Exception If failed.
*/
private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation, boolean nearCache)
throws Exception {
Ignite ig = startGridsMultiThreaded(3);

int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5;

CountDownLatch txLatch = new CountDownLatch(contCnt);

ig.cluster().state(ClusterState.ACTIVE);

client = true;

Ignite cl = startGrid();

IgniteTransactions cliTxMgr = cl.transactions();
int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 2;

IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
Ignite cl = startClientGrid();

IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Integer> clientCache = cl.createCache(cacheConfiguration(nearCache));

final Integer keyId = primaryKey(cache);
final Integer keyId = primaryKey(ig.cache(DEFAULT_CACHE_NAME));

CountDownLatch blockOnce = new CountDownLatch(1);
IgniteTransactions transactions = cl.transactions();

for (Ignite ig0 : G.allGrids()) {
if (ig0.configuration().isClientMode())
continue;
assertFalse(checkMetrics(ig));

TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
AtomicBoolean doTest = new AtomicBoolean(true);

commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) {
blockOnce.countDown();
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(
() -> {
while (doTest.get()) {
try (Transaction tx = transactions.txStart(concurrency, isolation)) {
clientCache.put(keyId, 0);

return true;
tx.commit();
}

return false;
}
});
}

IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
cache0.put(keyId, 0);
tx.commit();
}
});

blockOnce.await();

GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();

for (int i = 0; i < contCnt; ++i) {
IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
cache0.put(keyId, 0);

tx.commit();

txLatch.countDown();
}
});

finishFut.add(f0);
},
contCnt,
"txThread-" + testName.getMethodName());

try {
assertTrue(GridTestUtils.waitForCondition(
() -> checkMetrics(ig),
getTestTimeout()));
}

finishFut.markInitialized();

for (Ignite ig0 : G.allGrids()) {
TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();

if (ig0.configuration().isClientMode())
continue;

commSpi0.stopBlock();
finally {
doTest.set(false);
fut.get(getTestTimeout());
}
}

/**
* Checks if the transaction collision metrics contain the string "queueSize" for the given Ignite instance.
*
* @param ig Ignite instance.
* @return {@code true} if the metrics contain "queueSize"; otherwise {@code false}.
*/
private static boolean checkMetrics(Ignite ig) {
IgniteTxManager srvTxMgr = ((IgniteEx)ig).context().cache().context().tm();

assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo");
}
catch (IgniteCheckedException e) {
fail(e.toString());
}

CacheMetrics metrics = ig.cache(DEFAULT_CACHE_NAME).localMetrics();

String coll1 = metrics.getTxKeyCollisions();

if (!coll1.isEmpty()) {
String coll2 = metrics.getTxKeyCollisions();

// check idempotent
assertEquals(coll1, coll2);

assertTrue(coll1.contains("queueSize"));

return true;
}
else
return false;
}
}, 10_000));

f.get();

finishFut.get();
try {
U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo");
}
catch (IgniteCheckedException e) {
fail(e.toString());
}

txLatch.await();
return ig.cache(DEFAULT_CACHE_NAME)
.localMetrics()
.getTxKeyCollisions()
.contains("queueSize");
}
}