Skip to content

Commit

Permalink
IGNITE-22732 ModifyNode support
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Jul 15, 2024
1 parent 012f92b commit 70c9919
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProxyImpl;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
Expand Down Expand Up @@ -198,9 +198,10 @@ private void flushTuples(boolean force) throws IgniteCheckedException {

GridCacheContext<Object, Object> cctx = desc.cacheContext();

boolean sqlTxAwareEnabled = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
if (sqlTxAwareEnabled && cctx.transactional() && cctx.shared().tm().inUserTx()) {
invokeInsideTx(tuples);
GridNearTxLocal userTx = context().unwrap(GridNearTxLocal.class);

if (userTx != null) {
invokeInsideTx(userTx, tuples);

updatedRows += tuples.size();
}
Expand Down Expand Up @@ -236,31 +237,38 @@ private IgniteSQLException conflictKeysException(List<Object> conflictKeys) {
}

/** */
private void invokeInsideTx(List<ModifyTuple> tuples) throws IgniteCheckedException {
GridCacheProxyImpl<Object, Object> cache = desc.cacheContext().cache().keepBinary();
private void invokeInsideTx(GridNearTxLocal userTx, List<ModifyTuple> tuples) throws IgniteCheckedException {
userTx.resume();

for (ModifyTuple entry : tuples) {
assert entry.getOp() == op || op == TableModify.Operation.MERGE : entry.getOp();
try {
GridCacheProxyImpl<Object, Object> cache = desc.cacheContext().cache().keepBinary();

switch (entry.getOp()) {
case INSERT:
if (cache.get(entry.getKey()) != null)
throw conflictKeysException(Collections.singletonList(entry.getKey()));
for (ModifyTuple entry : tuples) {
assert entry.getOp() == op || op == TableModify.Operation.MERGE : entry.getOp();

case UPDATE:
cache.put(entry.getKey(), entry.getValue());
switch (entry.getOp()) {
case INSERT:
if (cache.get(entry.getKey()) != null)
throw conflictKeysException(Collections.singletonList(entry.getKey()));

break;
case DELETE:
assert op == TableModify.Operation.DELETE;
case UPDATE:
cache.put(entry.getKey(), entry.getValue());

cache.remove(entry.getKey());
break;
case DELETE:
assert op == TableModify.Operation.DELETE;

break;
default:
throw new AssertionError("Unexpected tuple operation: " + entry.getOp());
cache.remove(entry.getKey());

break;
default:
throw new AssertionError("Unexpected tuple operation: " + entry.getOp());
}
}
}
finally {
userTx.suspend();
}
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
Expand All @@ -35,6 +36,7 @@
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
Expand All @@ -43,11 +45,11 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;

/** */
@RunWith(Parameterized.class)
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_ALLOW_TX_AWARE_QUERIES, value = "true")
public class TransactionIsolationTest extends GridCommonAbstractTest {
/** */
public static final String USERS = "USERS";
Expand Down Expand Up @@ -159,15 +161,6 @@ public static Collection<?> parameters() {
cli.cache(USERS).removeAll();
}

/** */
@Test
public void testVisibility() {
try (Transaction tx = cli.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 1_000, 10)) {

tx.rollback();
}
}

/** */
@Test
public void testInsert() {
Expand All @@ -178,11 +171,11 @@ public void testInsert() {
insert(F.t(4, JOHN));

assertEquals(CACHE, JOHN, select(4, CACHE));
assertEquals(SQL, JOHN, select(4, SQL));
//assertEquals(SQL, JOHN, select(4, SQL));
});

assertNull(CACHE, select(4, CACHE));
assertNull(SQL, select(4, SQL));
//assertNull(SQL, select(4, SQL));
}

/** */
Expand All @@ -195,12 +188,12 @@ public void testUpdate() {
update(F.t(1, SARAH));

assertEquals(SARAH, select(1, CACHE));
assertEquals(SARAH, select(1, SQL));
//assertEquals(SARAH, select(1, SQL));

update(F.t(1, KYLE));

assertEquals(KYLE, select(1, CACHE));
assertEquals(KYLE, select(1, SQL));
//assertEquals(KYLE, select(1, SQL));
});

assertEquals(JOHN, select(1, CACHE));
Expand All @@ -217,7 +210,7 @@ public void testDelete() {
delete(1);

assertNull(select(1, CACHE));
assertNull(select(1, SQL));
//assertNull(select(1, SQL));
});

assertEquals(JOHN, select(1, CACHE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.ScanQuery;
Expand All @@ -30,6 +31,7 @@
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
Expand All @@ -38,6 +40,7 @@
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;

/** */
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_ALLOW_TX_AWARE_QUERIES, value = "true")
public class TransactionVisibilityTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1563,11 +1563,11 @@ public final class IgniteSystemProperties {
public static final String IGNITE_ALLOW_DML_INSIDE_TRANSACTION = "IGNITE_ALLOW_DML_INSIDE_TRANSACTION";

/**
* When set to true, Ignite will execute SQL queries in transaction aware mode
* When set to true, Ignite will execute SQL and scan queries in transaction aware mode
* Default is {@code false}.
*/
@SystemProperty("When set to true, Ignite will execute SQL queries in transaction aware mode")
public static final String IGNITE_ALLOW_TX_AWARE_SQL = "IGNITE_ALLOW_TX_AWARE_SQL";
@SystemProperty("When set to true, Ignite will execute SQL and scan queries in transaction aware mode")
public static final String IGNITE_ALLOW_TX_AWARE_QUERIES = "IGNITE_ALLOW_TX_AWARE_QUERIES";

/**
* Timeout between ZooKeeper client retries, default 2s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ public IgniteInternalTx tx(IgniteInternalTx tx) {
/**
* @return Transaction for current thread.
*/
public <T> T tx() {
public <T extends IgniteInternalTx> T tx() {
IgniteInternalTx tx = txContext();

return tx != null ? (T)tx : (T)tx(null, Thread.currentThread().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
Expand Down Expand Up @@ -158,6 +159,7 @@
import static java.util.Objects.nonNull;
import static java.util.regex.Pattern.CASE_INSENSITIVE;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_TX_AWARE_QUERIES;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA;
Expand Down Expand Up @@ -3029,7 +3031,11 @@ public List<FieldsQueryCursor<List<?>>> querySqlFields(
if (qry.isLocal() && ctx.clientNode())
throw new CacheException("Execution of local SqlFieldsQuery on client node disallowed.");

return executeQuerySafe(cctx, () -> {
final GridNearTxLocal userTx = IgniteSystemProperties.getBoolean(IGNITE_ALLOW_TX_AWARE_QUERIES)
? ctx.cache().context().tm().userTx()
: null;

return executeQuerySafe(cctx, userTx, () -> {
final String schemaName = qry.getSchema() == null ? schemaName(cctx) : qry.getSchema();

IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>> clo =
Expand All @@ -3048,17 +3054,19 @@ public List<FieldsQueryCursor<List<?>>> querySqlFields(
failOnMultipleStmts
);

QueryContext qryCtx = QueryContext.of(qry, cliCtx, cancel, qryProps, userTx);

if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isBatched()) {
res = qryEngine.queryBatched(
QueryContext.of(qry, cliCtx, cancel, qryProps),
qryCtx,
schemaName,
qry.getSql(),
((SqlFieldsQueryEx)qry).batchedArguments()
);
}
else {
res = qryEngine.query(
QueryContext.of(qry, cliCtx, cancel, qryProps),
qryCtx,
schemaName,
qry.getSql(),
qry.getArgs() != null ? qry.getArgs() : X.EMPTY_OBJECT_ARRAY
Expand Down Expand Up @@ -3094,7 +3102,7 @@ public List<JdbcParameterMeta> parameterMetaData(
) {
checkxModuleEnabled();

return executeQuerySafe(null, () -> {
return executeQuerySafe(null, null, () -> {
final String schemaName = qry.getSchema() == null ? QueryUtils.DFLT_SCHEMA : qry.getSchema();

QueryEngine qryEngine = engineForQuery(cliCtx, qry);
Expand Down Expand Up @@ -3122,7 +3130,7 @@ public List<GridQueryFieldMetadata> resultSetMetaData(
) {
checkxModuleEnabled();

return executeQuerySafe(null, () -> {
return executeQuerySafe(null, null, () -> {
final String schemaName = qry.getSchema() == null ? QueryUtils.DFLT_SCHEMA : qry.getSchema();

QueryEngine qryEngine = engineForQuery(cliCtx, qry);
Expand Down Expand Up @@ -3199,27 +3207,43 @@ private QueryEngine engineForQuery(SqlClientContext cliCtx, SqlFieldsQuery qry)
* Execute query setting busy lock, preserving current cache context and properly handling checked exceptions.
*
* @param cctx Cache context.
* @param userTx User transaction.
* @param supplier Code to be executed.
* @return Result.
*/
private <T> T executeQuerySafe(@Nullable final GridCacheContext<?, ?> cctx, GridPlainOutClosure<T> supplier) {
private <T> T executeQuerySafe(
@Nullable final GridCacheContext<?, ?> cctx,
@Nullable final GridNearTxLocal userTx,
GridPlainOutClosure<T> supplier
) {
GridCacheContext oldCctx = curCache.get();

curCache.set(cctx);

if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");

try {
return supplier.apply();
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");

if (userTx != null)
userTx.suspend();

try {
return supplier.apply();
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
}
finally {
curCache.set(oldCctx);

busyLock.leaveBusy();

if (userTx != null)
userTx.resume();
}
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
}
finally {
curCache.set(oldCctx);

busyLock.leaveBusy();
throw new IgniteException(e);
}
}

Expand Down

0 comments on commit 70c9919

Please sign in to comment.