diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java index d760ac16ba968..4b40be1e9f144 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java @@ -52,9 +52,11 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.query.running.TrackableQuery; +import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.session.SessionContext; import org.jetbrains.annotations.Nullable; /** @@ -159,8 +161,19 @@ public RootQuery( * @param schema new schema. */ public RootQuery childQuery(SchemaPlus schema) { - return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), ctx.isLocal(), ctx.isForcedJoinOrder(), - ctx.partitions(), exch, unregister, log, plannerTimeout, totalTimeout); + return new RootQuery<>( + sql, + schema, + params, + QueryContext.of(cancel, ctx.unwrap(GridResourceProcessor.class), ctx.unwrap(SessionContext.class)), + ctx.isLocal(), + ctx.isForcedJoinOrder(), + ctx.partitions(), + exch, + unregister, + log, + plannerTimeout, + totalTimeout); } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java index 80800fc2d3141..b054653c66eda 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java @@ -28,6 +28,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -46,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges; import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory; import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.ReflectiveCallNotNullImplementor; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker; @@ -59,10 +61,13 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.session.SessionContext; +import org.apache.ignite.session.SessionContextProvider; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -78,6 +83,9 @@ public class ExecutionContext extends AbstractQueryContext implements DataC /** Placeholder for NULL values in search bounds. */ private static final Object NULL_BOUND = new Object(); + /** Emtpy session context. */ + private static final SessionContext EMPTY_SESSION_CONTEXT = (attrName) -> null; + /** */ private final UUID qryId; @@ -126,6 +134,12 @@ public class ExecutionContext extends AbstractQueryContext implements DataC /** */ private final long startTs; + /** Map associates UDF name to instance of class that contains this UDF. */ + private final Map udfObjs = new ConcurrentHashMap<>(); + + /** Session context provider injected into UDF targets. */ + private final SessionContextProvider sesCtxProv = new SessionContextProviderImpl(); + /** */ private Object[] correlations = new Object[16]; @@ -457,6 +471,31 @@ public IoTracker ioTracker() { return ioTracker; } + /** + * Return an object contained a user defined function. If not exist yet, then instantiate the object and inject resources into it. + * Used by {@link ReflectiveCallNotNullImplementor} while it is preparing user function call. + * + * @param udfClsName Classname of the class contained UDF. + * @return Object with injected resources. + */ + public Object udfObject(String udfClsName) { + return udfObjs.computeIfAbsent(udfClsName, ignore -> { + try { + Class funcCls = getClass().getClassLoader().loadClass(udfClsName); + + Object target = funcCls.getConstructor().newInstance(); + + unwrap(GridResourceProcessor.class).injectToUdf(target, sesCtxProv); + + return target; + } + catch (Exception e) { + throw new IgniteException("Failed to instantiate an object for UDF. " + + "Class " + udfClsName + " must have public zero-args constructor.", e); + } + }); + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) @@ -473,4 +512,14 @@ public IoTracker ioTracker() { @Override public int hashCode() { return Objects.hash(qryId, fragmentDesc.fragmentId()); } + + /** */ + private class SessionContextProviderImpl implements SessionContextProvider { + /** {@inheritDoc} */ + @Override public @Nullable SessionContext getSessionContext() { + SessionContext ctx = unwrap(SessionContext.class); + + return ctx == null ? EMPTY_SESSION_CONTEXT : ctx; + } + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 252358a870f52..754be36144bdc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -40,6 +40,7 @@ import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.cache.context.SessionContextImpl; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -653,6 +654,8 @@ private ListFieldsQueryCursor mapAndExecutePlan( qry.onResponse(nodeId, fragment.fragmentId(), ex); else { try { + SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class); + QueryStartRequest req = new QueryStartRequest( qry.id(), qry.localQueryId(), @@ -664,7 +667,8 @@ private ListFieldsQueryCursor mapAndExecutePlan( qry.parameters(), parametersMarshalled, timeout, - ectx.getQryTxEntries() + ectx.getQryTxEntries(), + sesCtx == null ? null : sesCtx.attributes() ); messageService().send(nodeId, req); @@ -851,7 +855,9 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) { ) ); - final BaseQueryContext qctx = createQueryContext(Contexts.empty(), msg.schema()); + final BaseQueryContext qctx = createQueryContext( + Contexts.of(ctx.resource(), msg.appAttrs() == null ? null : new SessionContextImpl(msg.appAttrs())), + msg.schema()); QueryPlan qryPlan = queryPlanCache().queryPlan( new CacheKey(msg.schema(), msg.root()), diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteScalarFunction.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteScalarFunction.java index 0e44c88d6e4d4..41655ef16d5f1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteScalarFunction.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteScalarFunction.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.calcite.exec.exp; import java.lang.reflect.Method; -import java.lang.reflect.Modifier; import org.apache.calcite.adapter.enumerable.NullPolicy; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -47,8 +46,6 @@ private IgniteScalarFunction(Method method, CallImplementor implementor) { * @return Created {@link ScalarFunction}. */ public static ScalarFunction create(Method method) { - assert Modifier.isStatic(method.getModifiers()); - CallImplementor implementor = RexImpTable.createImplementor( new ReflectiveCallNotNullImplementor(method), NullPolicy.NONE, false); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ReflectiveCallNotNullImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ReflectiveCallNotNullImplementor.java index 857605b7201fe..5e5ab934ec244 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ReflectiveCallNotNullImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ReflectiveCallNotNullImplementor.java @@ -22,7 +22,9 @@ import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Types; import org.apache.calcite.rex.RexCall; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; /** * Implementation of {@link NotNullImplementor} that calls a given {@link Method}. @@ -55,10 +57,13 @@ public ReflectiveCallNotNullImplementor(Method method) { callExpr = Expressions.call(method, translatedOperands); else { - // The UDF class must have a public zero-args constructor. - // Assume that the validator checked already. - final Expression target = - Expressions.new_(method.getDeclaringClass()); + final Expression target = Expressions.convert_( + Expressions.call( + translator.getRoot(), + Types.lookupMethod(ExecutionContext.class, "udfObject", String.class), + Expressions.constant(method.getDeclaringClass().getName())), + method.getDeclaringClass()); + callExpr = Expressions.call(target, method, translatedOperands); } if (!containsCheckedException(method)) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java index 4b63bf0256703..4913e965c400a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; @@ -71,6 +72,9 @@ public class QueryStartRequest implements MarshalableMessage, ExecutionContextAw @GridDirectCollection(QueryTxEntry.class) private @Nullable Collection qryTxEntries; + /** */ + private Map appAttrs; + /** */ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public QueryStartRequest( @@ -84,7 +88,8 @@ public QueryStartRequest( Object[] params, @Nullable byte[] paramsBytes, long timeout, - Collection qryTxEntries + Collection qryTxEntries, + @Nullable Map appAttrs ) { this.qryId = qryId; this.originatingQryId = originatingQryId; @@ -97,6 +102,7 @@ public QueryStartRequest( this.paramsBytes = paramsBytes; // If we already have marshalled params, use it. this.timeout = timeout; this.qryTxEntries = qryTxEntries; + this.appAttrs = appAttrs; } /** */ @@ -184,6 +190,11 @@ public long timeout() { return qryTxEntries; } + /** */ + public Map appAttrs() { + return appAttrs; + } + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { if (paramsBytes == null && params != null) @@ -284,6 +295,11 @@ public long timeout() { writer.incrementState(); + case 10: + if (!writer.writeMap("appAttrs", appAttrs, MessageCollectionItemType.STRING, MessageCollectionItemType.STRING)) + return false; + + writer.incrementState(); } return true; @@ -377,6 +393,14 @@ public long timeout() { reader.incrementState(); + case 10: + appAttrs = reader.readMap("appAttrs", MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(QueryStartRequest.class); @@ -389,6 +413,6 @@ public long timeout() { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 11; } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/cache/SessionContextSqlFunctionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/cache/SessionContextSqlFunctionTest.java new file mode 100644 index 0000000000000..5786207a7aed1 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/cache/SessionContextSqlFunctionTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.SqlConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.SessionContextProviderResource; +import org.apache.ignite.session.SessionContext; +import org.apache.ignite.session.SessionContextProvider; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** */ +@RunWith(Parameterized.class) +public class SessionContextSqlFunctionTest extends GridCommonAbstractTest { + /** */ + private static final String SESSION_ID = "sessionId"; + + /** */ + private Ignite ign; + + /** */ + @Parameterized.Parameter + public CacheAtomicityMode mode; + + /** */ + @Parameterized.Parameter(1) + public boolean clnNode; + + /** */ + @Parameterized.Parameters(name = "mode={0}, clnNode={1}") + public static List parameters() { + List params = new ArrayList<>(); + + for (CacheAtomicityMode m: CacheAtomicityMode.values()) { + params.add(new Object[] {m, false}); + params.add(new Object[] {m, true}); + } + + return params; + } + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSqlConfiguration(new SqlConfiguration() + .setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration())); + + cfg.setCacheConfiguration( + new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setSqlSchema("PUBLIC") + .setSqlFunctionClasses(SessionContextSqlFunctions.class)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ign = startGrids(3); + + if (clnNode) + ign = startClientGrid(3); + + ignQuery(ign, "create table PUBLIC.MYTABLE(id int primary key, sessionId varchar);"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** */ + @Test + public void testWhereClause() { + for (int i = 0; i < 100; i++) { + String sesId = i % 2 == 0 ? "1" : "2"; + + ignQuery(ign, "insert into PUBLIC.MYTABLE(id, sessionId) values (?, ?);", i, sesId); + } + + for (String sesId: F.asList("1", "2")) { + Ignite ignApp = ign.withApplicationAttributes(F.asMap(SESSION_ID, sesId)); + + List> rows = ignQuery(ignApp, "select * from PUBLIC.MYTABLE where sessionId = sessionId();"); + + assertEquals(50, rows.size()); + + for (List row: rows) { + String actSesId = row.get(1).toString(); + + assertEquals(sesId, actSesId); + } + } + } + + /** */ + @Test + public void testInsertClause() { + for (int i = 0; i < 100; i++) { + String sesId = i % 2 == 0 ? "1" : "2"; + + Ignite ignApp = ign.withApplicationAttributes(F.asMap(SESSION_ID, sesId)); + + ignQuery(ignApp, "insert into PUBLIC.MYTABLE(id, sessionId) values (" + i + ", sessionId());"); + } + + List> res = ignQuery(ign, "select * from PUBLIC.MYTABLE where sessionId = 1"); + + assertEquals(50, res.size()); + + res = ignQuery(ign, "select * from PUBLIC.MYTABLE where sessionId = 2"); + + assertEquals(50, res.size()); + } + + /** */ + @Test + public void testNestedQuery() { + for (int i = 0; i < 100; i++) { + String sesId = i % 2 == 0 ? "1" : "2"; + + ignQuery(ign, "insert into PUBLIC.MYTABLE(id, sessionId) values (?, ?);", i, sesId); + } + + String sesId = "1"; + + Ignite ignApp = ign.withApplicationAttributes(F.asMap(SESSION_ID, sesId)); + + List> rows = ignQuery(ignApp, "select * from PUBLIC.MYTABLE where sessionId = (select sessionId());"); + + int size = 0; + + for (List row: rows) { + String actSesId = row.get(1).toString(); + + assertEquals(sesId, actSesId); + + size++; + } + + assertEquals(50, size); + } + + /** */ + @Test + public void testOverwriteApplicationAttributes() { + Ignite ignApp = ign; + + for (int i = 0; i < 100; i++) { + String sesId = i % 2 == 0 ? "1" : "2"; + + ignApp = ignApp.withApplicationAttributes(F.asMap(SESSION_ID, sesId)); + + ignQuery(ignApp, "insert into PUBLIC.MYTABLE(id, sessionId) values (" + i + ", sessionId());"); + } + + List> res = ignQuery(ign, "select * from PUBLIC.MYTABLE where sessionId = 1"); + + assertEquals(50, res.size()); + + res = ignQuery(ign, "select * from PUBLIC.MYTABLE where sessionId = 2"); + + assertEquals(50, res.size()); + } + + /** */ + @Test + public void testMultithreadApplication() throws Exception { + String sesId = "1"; + + Ignite ignApp = ign.withApplicationAttributes(F.asMap(SESSION_ID, sesId)); + + IgniteInternalFuture insertFut = multithreadedAsync(() -> { + for (int i = 0; i < 100; i++) + ignQuery(ignApp, "insert into PUBLIC.MYTABLE(id, sessionId) values (" + i + ", sessionId());"); + }, 1, "insert"); + + insertFut.get(getTestTimeout()); + + List> res = ignQuery(ign, "select * from PUBLIC.MYTABLE where sessionId = " + sesId); + + assertEquals(100, res.size()); + } + + /** */ + private List> ignQuery(Ignite ign, String sql, Object... args) { + return ign.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery(sql).setArgs(args)).getAll(); + } + + /** */ + public static class SessionContextSqlFunctions { + /** */ + @SessionContextProviderResource + public SessionContextProvider sesCtxProv; + + /** */ + @QuerySqlFunction + public String sessionId() { + SessionContext sesCtx = sesCtxProv.getSessionContext(); + + return sesCtx == null ? null : sesCtx.getAttribute(SESSION_ID); + } + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java index ee4547cd4ea3d..6198b40ea30c3 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java @@ -17,6 +17,7 @@ package org.apache.ignite.testsuites; +import org.apache.ignite.internal.processors.cache.SessionContextSqlFunctionTest; import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeSortedIndexTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTest; @@ -55,6 +56,7 @@ RuntimeSortedIndexTest.class, LimitExecutionTest.class, TimeCalculationExecutionTest.class, + SessionContextSqlFunctionTest.class }) public class ExecutionTestSuite { } diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 4680150b47ef6..3518066b600db 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -18,11 +18,13 @@ package org.apache.ignite; import java.util.Collection; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import javax.cache.CacheException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.AtomicConfiguration; @@ -38,6 +40,7 @@ import org.apache.ignite.metric.IgniteMetrics; import org.apache.ignite.plugin.IgnitePlugin; import org.apache.ignite.plugin.PluginNotFoundException; +import org.apache.ignite.session.SessionContextProvider; import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry; import org.apache.ignite.spi.tracing.TracingConfigurationManager; import org.jetbrains.annotations.NotNull; @@ -788,4 +791,18 @@ public IgniteQueue queue(String name, int cap, @Nullable CollectionConfig */ @IgniteExperimental public @NotNull TracingConfigurationManager tracingConfiguration(); + + /** + * Underlying operations of returned Ignite instance are aware of application attributes. + * User defined functions can access the attributes with {@link SessionContextProvider} API. + * List of supported types of user defined functions that have access the attributes: + *
    + *
  • {@link QuerySqlFunction}
  • + *
+ * + * @param attrs Application attributes. + * @return Ignite instance that is aware of application attributes. + */ + @IgniteExperimental + public Ignite withApplicationAttributes(Map attrs); } diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/annotations/QuerySqlFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/query/annotations/QuerySqlFunction.java index e7237bf109bf7..f2fa42125a7b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/annotations/QuerySqlFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/annotations/QuerySqlFunction.java @@ -22,10 +22,11 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.apache.ignite.resources.SessionContextProviderResource; /** - * Annotates public static methods in classes to be used in SQL queries as custom functions. - * Annotated class must be registered in H2 indexing SPI using following method + * Annotates public methods in classes to be used in SQL queries as custom functions. + * Annotated class must be registered using following method * {@link org.apache.ignite.configuration.CacheConfiguration#setSqlFunctionClasses(Class[])}. *

* Example usage: @@ -37,15 +38,29 @@ * } * } * - * // Register. - * indexing.setSqlFunctionClasses(MyFunctions.class); + * // Register in CacheConfiguration. + * cacheCfg.setSqlFunctionClasses(MyFunctions.class); * * // And use in queries. - * cache.queries().createSqlFieldsQuery("select sqr(2) where sqr(1) = 1"); + * ign.query(new SqlFieldsQuery("select sqr(2) where sqr(1) = 1")); * *

- * For more information about H2 custom functions please refer to - * H2 documentation. + * SQL functions can use attributes set on client side: + *

+ *     public class MyFunctions {
+ *         @SessionContextProviderResource
+ *         public SessionContextProvider sesCtxProv;
+ *
+ *         @QuerySqlFunction
+ *         public String sessionId() {
+ *             return sesCtxProv.getSessionContext().getAttribute("SESSION_ID");
+ *         }
+ *     }
+ * 
+ * Note, accessing to the attributes is available in the Calcite query engine only. In a such case a class must have public + * zero-args constructor. + * + * @see SessionContextProviderResource */ @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteApplicationAttributesAware.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteApplicationAttributesAware.java new file mode 100644 index 0000000000000..b0aaefc1199bc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteApplicationAttributesAware.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import javax.cache.CacheException; +import org.apache.ignite.DataRegionMetrics; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.IgniteAtomicReference; +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.IgniteAtomicStamped; +import org.apache.ignite.IgniteBinary; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCluster; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteEncryption; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteScheduler; +import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.IgniteServices; +import org.apache.ignite.IgniteSet; +import org.apache.ignite.IgniteSnapshot; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.MemoryMetrics; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.configuration.AtomicConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.metric.IgniteMetrics; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginNotFoundException; +import org.apache.ignite.spi.tracing.TracingConfigurationManager; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** Ignite instance aware of application attributes set with {@link Ignite#withApplicationAttributes(Map)}. */ +public class IgniteApplicationAttributesAware implements Ignite { + /** */ + private final Ignite delegate; + + /** Application attributes. */ + private final Map attrs; + + /** Stores caches configured with the application attributes. */ + private final Map> appAttrCaches = new ConcurrentHashMap<>(); + + /** + * @param delegate Parent Ignite instance. + * @param attrs Application attributes. + */ + public IgniteApplicationAttributesAware(Ignite delegate, Map attrs) { + A.notNull(attrs, "application attributes"); + + this.delegate = delegate; + this.attrs = new HashMap<>(attrs); + } + + /** + * Set application attributes to all returned caches. + * + * @param cache Cache, or {@code null}. + * @return Cache with application attributes. + */ + private IgniteCache withApplicationAttributes(@Nullable IgniteCache cache) { + if (cache == null) + return null; + + return (IgniteCache)appAttrCaches + .computeIfAbsent(cache.getName(), c -> ((IgniteCacheProxy)cache).withApplicationAttributes(attrs)); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createCache(CacheConfiguration cacheCfg) throws CacheException { + return withApplicationAttributes(delegate.createCache(cacheCfg)); + } + + /** {@inheritDoc} */ + @Override public Collection createCaches(Collection cacheCfgs) throws CacheException { + return delegate.createCaches(cacheCfgs) + .stream() + .map(c -> withApplicationAttributes(c)) + .collect(Collectors.toList()); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createCache(String cacheName) throws CacheException { + return withApplicationAttributes(delegate.createCache(cacheName)); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateCache(CacheConfiguration cacheCfg) throws CacheException { + IgniteCache cache = (IgniteCache)appAttrCaches.get(cacheCfg.getName()); + + if (cache != null) + return cache; + + return withApplicationAttributes(delegate.getOrCreateCache(cacheCfg)); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateCache(String cacheName) throws CacheException { + IgniteCache cache = (IgniteCache)appAttrCaches.get(cacheName); + + if (cache != null) + return cache; + + return withApplicationAttributes(delegate.getOrCreateCache(cacheName)); + } + + /** {@inheritDoc} */ + @Override public Collection getOrCreateCaches(Collection cacheCfgs) throws CacheException { + return delegate.getOrCreateCaches(cacheCfgs) + .stream() + .map(c -> withApplicationAttributes(c)) + .collect(Collectors.toList()); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createCache( + CacheConfiguration cacheCfg, NearCacheConfiguration nearCfg + ) throws CacheException { + return withApplicationAttributes(delegate.createCache(cacheCfg, nearCfg)); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateCache( + CacheConfiguration cacheCfg, NearCacheConfiguration nearCfg + ) throws CacheException { + return withApplicationAttributes(delegate.getOrCreateCache(cacheCfg, nearCfg)); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createNearCache( + String cacheName, NearCacheConfiguration nearCfg + ) throws CacheException { + return withApplicationAttributes(delegate.createNearCache(cacheName, nearCfg)); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateNearCache( + String cacheName, NearCacheConfiguration nearCfg + ) throws CacheException { + return withApplicationAttributes(delegate.getOrCreateNearCache(cacheName, nearCfg)); + } + + /** {@inheritDoc} */ + @Override public IgniteCache cache(String name) throws CacheException { + IgniteCache cache = (IgniteCache)appAttrCaches.get(name); + + if (cache != null) + return cache; + + return withApplicationAttributes(delegate.cache(name)); + } + + /** {@inheritDoc} */ + @Override public String name() { + return delegate.name(); + } + + /** {@inheritDoc} */ + @Override public IgniteLogger log() { + return delegate.log(); + } + + /** {@inheritDoc} */ + @Override public IgniteConfiguration configuration() { + return delegate.configuration(); + } + + /** {@inheritDoc} */ + @Override public IgniteCluster cluster() { + return delegate.cluster(); + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute() { + return delegate.compute(); + } + + /** {@inheritDoc} */ + @Override public IgniteMetrics metrics() { + return delegate.metrics(); + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute(ClusterGroup grp) { + return delegate.compute(grp); + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message() { + return delegate.message(); + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message(ClusterGroup grp) { + return delegate.message(grp); + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events() { + return delegate.events(); + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events(ClusterGroup grp) { + return delegate.events(grp); + } + + /** {@inheritDoc} */ + @Override public IgniteServices services() { + return delegate.services(); + } + + /** {@inheritDoc} */ + @Override public IgniteServices services(ClusterGroup grp) { + return delegate.services(grp); + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService() { + return delegate.executorService(); + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService(ClusterGroup grp) { + return delegate.executorService(grp); + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return delegate.version(); + } + + /** {@inheritDoc} */ + @Override public IgniteScheduler scheduler() { + return delegate.scheduler(); + } + + /** {@inheritDoc} */ + @Override public void addCacheConfiguration(CacheConfiguration cacheCfg) throws CacheException { + delegate.addCacheConfiguration(cacheCfg); + } + + /** {@inheritDoc} */ + @Override public void destroyCache(String cacheName) throws CacheException { + delegate.destroyCache(cacheName); + + appAttrCaches.remove(cacheName); + } + + /** {@inheritDoc} */ + @Override public void destroyCaches(Collection cacheNames) throws CacheException { + delegate.destroyCaches(cacheNames); + + for (String c: cacheNames) + appAttrCaches.remove(c); + } + + /** {@inheritDoc} */ + @Override public Collection cacheNames() { + return delegate.cacheNames(); + } + + /** {@inheritDoc} */ + @Override public IgniteTransactions transactions() { + return delegate.transactions(); + } + + /** {@inheritDoc} */ + @Override public IgniteDataStreamer dataStreamer(String cacheName) throws IllegalStateException { + return delegate.dataStreamer(cacheName); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) throws IgniteException { + return delegate.atomicSequence(name, initVal, create); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicSequence atomicSequence( + String name, AtomicConfiguration cfg, long initVal, boolean create + ) throws IgniteException { + return delegate.atomicSequence(name, cfg, initVal, create); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) throws IgniteException { + return delegate.atomicLong(name, initVal, create); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicLong atomicLong( + String name, AtomicConfiguration cfg, long initVal, boolean create + ) throws IgniteException { + return delegate.atomicLong(name, cfg, initVal, create); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicReference atomicReference( + String name, @Nullable T initVal, boolean create + ) throws IgniteException { + return delegate.atomicReference(name, initVal, create); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicReference atomicReference( + String name, AtomicConfiguration cfg, @Nullable T initVal, boolean create + ) throws IgniteException { + return delegate.atomicReference(name, cfg, initVal, create); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicStamped atomicStamped( + String name, @Nullable T initVal, @Nullable S initStamp, boolean create + ) throws IgniteException { + return delegate.atomicStamped(name, initVal, initStamp, create); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicStamped atomicStamped( + String name, AtomicConfiguration cfg, @Nullable T initVal, @Nullable S initStamp, boolean create + ) throws IgniteException { + return delegate.atomicStamped(name, cfg, initVal, initStamp, create); + } + + /** {@inheritDoc} */ + @Override public IgniteCountDownLatch countDownLatch( + String name, int cnt, boolean autoDel, boolean create + ) throws IgniteException { + return delegate.countDownLatch(name, cnt, autoDel, create); + } + + /** {@inheritDoc} */ + @Override public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, boolean create) throws IgniteException { + return delegate.semaphore(name, cnt, failoverSafe, create); + } + + /** {@inheritDoc} */ + @Override public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean fair, boolean create) throws IgniteException { + return delegate.reentrantLock(name, failoverSafe, fair, create); + } + + /** {@inheritDoc} */ + @Override public IgniteQueue queue(String name, int cap, @Nullable CollectionConfiguration cfg) throws IgniteException { + return delegate.queue(name, cap, cfg); + } + + /** {@inheritDoc} */ + @Override public IgniteSet set(String name, @Nullable CollectionConfiguration cfg) throws IgniteException { + return delegate.set(name, cfg); + } + + /** {@inheritDoc} */ + @Override public T plugin(String name) throws PluginNotFoundException { + return delegate.plugin(name); + } + + /** {@inheritDoc} */ + @Override public IgniteBinary binary() { + return delegate.binary(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + delegate.close(); + } + + /** {@inheritDoc} */ + @Override public Affinity affinity(String cacheName) { + return delegate.affinity(cacheName); + } + + /** {@inheritDoc} */ + @Override public boolean active() { + return delegate.active(); + } + + /** {@inheritDoc} */ + @Override public void active(boolean active) { + delegate.active(active); + } + + /** {@inheritDoc} */ + @Override public void resetLostPartitions(Collection cacheNames) { + delegate.resetLostPartitions(cacheNames); + } + + /** {@inheritDoc} */ + @Override public Collection memoryMetrics() { + return delegate.memoryMetrics(); + } + + /** {@inheritDoc} */ + @Override public @Nullable MemoryMetrics memoryMetrics(String dataRegionName) { + return delegate.memoryMetrics(dataRegionName); + } + + /** {@inheritDoc} */ + @Override public Collection dataRegionMetrics() { + return delegate.dataRegionMetrics(); + } + + /** {@inheritDoc} */ + @Override public @Nullable DataRegionMetrics dataRegionMetrics(String memPlcName) { + return delegate.dataRegionMetrics(memPlcName); + } + + /** {@inheritDoc} */ + @Override public IgniteEncryption encryption() { + return delegate.encryption(); + } + + /** {@inheritDoc} */ + @Override public IgniteSnapshot snapshot() { + return delegate.snapshot(); + } + + /** {@inheritDoc} */ + @Override public @NotNull TracingConfigurationManager tracingConfiguration() { + return delegate.tracingConfiguration(); + } + + /** */ + @Override public Ignite withApplicationAttributes(Map attrs) { + if (this.attrs.equals(attrs)) + return this; + + return delegate.withApplicationAttributes(attrs); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 558c56f4001b8..1e1e8fb5212f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2898,6 +2898,11 @@ private Collection baselineNodes() { } } + /** {@inheritDoc} */ + @Override public Ignite withApplicationAttributes(Map attrs) { + return new IgniteApplicationAttributesAware(this, attrs); + } + /** {@inheritDoc} */ @Override public IgniteEncryption encryption() { return ctx.encryption(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextImpl.java new file mode 100644 index 0000000000000..d28f5f1678c20 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextImpl.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cache.context; + +import java.util.Map; +import org.apache.ignite.session.SessionContext; +import org.jetbrains.annotations.Nullable; + +/** */ +public final class SessionContextImpl implements SessionContext { + /** Application attributes. */ + private final Map attrs; + + /** @param attrs Application attributes. */ + public SessionContextImpl(Map attrs) { + this.attrs = attrs; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getAttribute(String name) { + return attrs.get(name); + } + + /** @return Application attributes. */ + public Map attributes() { + return attrs; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java index cb816f6431254..febda399d9f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.cache.ReadRepairStrategy; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -54,6 +56,9 @@ public class CacheOperationContext implements Serializable { /** Data center Id. */ private final Byte dataCenterId; + /** Application attributes. */ + private final Map appAttrs; + /** * Constructor with default values. */ @@ -65,6 +70,7 @@ public CacheOperationContext() { recovery = false; readRepairStrategy = null; dataCenterId = null; + appAttrs = null; } /** @@ -73,6 +79,7 @@ public CacheOperationContext() { * @param expiryPlc Expiry policy. * @param dataCenterId Data center id. * @param readRepairStrategy Read-repair strategy. + * @param appAttrs Application attributes. */ public CacheOperationContext( boolean skipStore, @@ -81,7 +88,8 @@ public CacheOperationContext( boolean noRetries, @Nullable Byte dataCenterId, boolean recovery, - @Nullable ReadRepairStrategy readRepairStrategy + @Nullable ReadRepairStrategy readRepairStrategy, + @Nullable Map appAttrs ) { this.skipStore = skipStore; this.keepBinary = keepBinary; @@ -90,6 +98,7 @@ public CacheOperationContext( this.dataCenterId = dataCenterId; this.recovery = recovery; this.readRepairStrategy = readRepairStrategy; + this.appAttrs = appAttrs; } /** @@ -119,7 +128,8 @@ public CacheOperationContext keepBinary() { noRetries, dataCenterId, recovery, - readRepairStrategy); + readRepairStrategy, + appAttrs); } /** @@ -152,7 +162,8 @@ public CacheOperationContext setSkipStore(boolean skipStore) { noRetries, dataCenterId, recovery, - readRepairStrategy); + readRepairStrategy, + appAttrs); } /** @@ -176,7 +187,8 @@ public CacheOperationContext withExpiryPolicy(ExpiryPolicy plc) { noRetries, dataCenterId, recovery, - readRepairStrategy); + readRepairStrategy, + appAttrs); } /** @@ -191,7 +203,8 @@ public CacheOperationContext setNoRetries(boolean noRetries) { noRetries, dataCenterId, recovery, - readRepairStrategy); + readRepairStrategy, + appAttrs); } /** @@ -206,7 +219,8 @@ public CacheOperationContext setDataCenterId(byte dataCenterId) { noRetries, dataCenterId, recovery, - readRepairStrategy); + readRepairStrategy, + appAttrs); } /** @@ -221,7 +235,8 @@ public CacheOperationContext setRecovery(boolean recovery) { noRetries, dataCenterId, recovery, - readRepairStrategy); + readRepairStrategy, + appAttrs); } /** @@ -236,7 +251,24 @@ public CacheOperationContext setReadRepairStrategy(ReadRepairStrategy readRepair noRetries, dataCenterId, recovery, - readRepairStrategy); + readRepairStrategy, + appAttrs); + } + + /** + * @param appAttrs Application attributes. + * @return New instance of CacheOperationContext with application attributes. + */ + public CacheOperationContext setApplicationAttributes(Map appAttrs) { + return new CacheOperationContext( + skipStore, + keepBinary, + expiryPlc, + noRetries, + dataCenterId, + recovery, + readRepairStrategy, + new HashMap<>(appAttrs)); } /** @@ -260,6 +292,13 @@ public boolean noRetries() { return noRetries; } + /** + * @return Application attributes. + */ + public Map applicationAttributes() { + return appAttrs; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheOperationContext.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java index 74713f6dbfdc2..fba0e990c5e97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -173,6 +173,20 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { } } + /** {@inheritDoc} */ + @Override public IgniteCache withApplicationAttributes(Map appAttrs) { + A.notNull(appAttrs, "application attributes"); + + CacheOperationGate opGate = onEnter(); + + try { + return new GatewayProtectedCacheProxy<>(delegate, opCtx.setApplicationAttributes(appAttrs), lock); + } + finally { + onLeave(opGate); + } + } + /** {@inheritDoc} */ @Override public GatewayProtectedCacheProxy withNoRetries() { CacheOperationGate opGate = onEnter(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 18efb388fe33c..c5acf90b3b25c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -472,6 +472,7 @@ public void active(boolean active) { false, null, false, + null, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); @@ -486,6 +487,7 @@ public void active(boolean active) { false, null, false, + null, null); return new GridCacheProxyImpl<>((GridCacheContext)ctx, (GridCacheAdapter)this, opCtx); @@ -507,6 +509,7 @@ public void active(boolean active) { false, null, false, + null, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); @@ -521,6 +524,7 @@ public void active(boolean active) { true, null, false, + null, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index dc200ec355759..e23b3019e3aa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -267,6 +267,7 @@ public IgniteInternalCache delegate() { false, null, false, + null, null)); } finally { @@ -288,6 +289,7 @@ public IgniteInternalCache delegate() { false, null, false, + null, null)); } @@ -1537,6 +1539,7 @@ public IgniteInternalCache delegate() { false, null, false, + null, null)); } finally { @@ -1557,6 +1560,7 @@ public IgniteInternalCache delegate() { true, null, false, + null, null)); } finally { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index e2206a0669cdf..92971485a507f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -20,6 +20,7 @@ import java.io.Externalizable; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.query.FieldsQueryCursor; @@ -86,6 +87,12 @@ public interface IgniteCacheProxy extends IgniteCache, Externalizabl */ public IgniteCache skipStore(); + /** + * @param appAttrs Application attributes. + * @return Cache with application attributes. + */ + public IgniteCache withApplicationAttributes(Map appAttrs); + /** * @return Internal proxy. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 9f319f0cff99a..ba68175861441 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -2054,6 +2054,11 @@ else if (clazz.isAssignableFrom(IgniteEx.class)) { throw new UnsupportedOperationException(); } + /** */ + @Override public IgniteCache withApplicationAttributes(Map appAttrs) { + throw new UnsupportedOperationException(); + } + /** * Method converts exception to IgniteCacheRestartingException in case of cache restarting * or to CacheException in other cases. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 4a931a4d98911..6edb0f55643a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -440,6 +440,7 @@ protected GridCacheQueueAdapter(String queueName, GridCacheQueueHeader hdr, Grid false, null, false, + null, null) : opCtx.keepBinary(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index e90aabb8890a9..740c507584e37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.binary.BinaryMetadata; +import org.apache.ignite.internal.cache.context.SessionContextImpl; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.query.index.IndexQueryProcessor; import org.apache.ignite.internal.cache.query.index.IndexQueryResult; @@ -76,6 +77,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; @@ -149,6 +151,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.session.SessionContext; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.thread.IgniteThread; @@ -3069,12 +3072,23 @@ public List>> querySqlFields( failOnMultipleStmts ); + SessionContext sesCtx = null; + + if (cctx != null) { + CacheOperationContext opCtx = cctx.operationContextPerCall(); + + if (opCtx != null && opCtx.applicationAttributes() != null) + sesCtx = new SessionContextImpl(opCtx.applicationAttributes()); + } + QueryContext qryCtx = QueryContext.of( qry, cliCtx, cancel, qryProps, - userTx == null ? null : userTx.xidVersion() + userTx == null ? null : userTx.xidVersion(), + ctx.resource(), + sesCtx ); if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isBatched()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SchemaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SchemaManager.java index 1e32b7ffae5aa..d7e98885fda4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SchemaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SchemaManager.java @@ -509,8 +509,8 @@ private void createSqlFunctions(String schema, Class[] clss) throws IgniteChe if (ann != null) { int modifiers = m.getModifiers(); - if (!Modifier.isStatic(modifiers) || !Modifier.isPublic(modifiers)) - throw new IgniteCheckedException("Method " + m.getName() + " must be public static."); + if (!Modifier.isPublic(modifiers)) + throw new IgniteCheckedException("Method " + m.getName() + " must be public."); String alias = ann.alias().isEmpty() ? m.getName() : ann.alias(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java index e748be67b0e09..c49254e54bfa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java @@ -44,6 +44,7 @@ import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.resources.ServiceContextResource; import org.apache.ignite.resources.ServiceResource; +import org.apache.ignite.resources.SessionContextProviderResource; import org.apache.ignite.resources.SpringApplicationContextResource; import org.apache.ignite.resources.SpringResource; import org.apache.ignite.resources.TaskContinuousMapperResource; @@ -510,7 +511,10 @@ enum ResourceAnnotation { CACHE_STORE_SESSION(CacheStoreSessionResource.class), /** */ - SERVICE_CONTEXT(ServiceContextResource.class); + SERVICE_CONTEXT(ServiceContextResource.class), + + /** */ + SESSION_CONTEXT_PROVIDER(SessionContextProviderResource.class); /** */ public final Class clazz; @@ -570,6 +574,11 @@ public enum AnnotationSet { ResourceAnnotation.IGNITE_INSTANCE, ResourceAnnotation.LOGGER, ResourceAnnotation.SERVICE + ), + + /** */ + USER_DEFINED_FUNCTION( + ResourceAnnotation.SESSION_CONTEXT_PROVIDER ); /** Resource annotations bits for fast checks. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java index e3c16e7043662..219960564440a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java @@ -40,6 +40,7 @@ import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.session.SessionContextProvider; import org.apache.ignite.spi.IgniteSpi; import org.jetbrains.annotations.Nullable; @@ -230,6 +231,16 @@ public void injectGeneric(Object obj) throws IgniteCheckedException { inject(obj, GridResourceIoc.AnnotationSet.GENERIC); } + /** + * Inject resources to object contained a user defined function. + * + * @param sesCtxProv Session context provider. + * @throws IgniteCheckedException If failed to inject. + */ + public void injectToUdf(Object obj, SessionContextProvider sesCtxProv) throws IgniteCheckedException { + inject(obj, GridResourceIoc.AnnotationSet.USER_DEFINED_FUNCTION, sesCtxProv); + } + /** * @param obj Object to inject. * @param annSet Supported annotations. @@ -338,6 +349,10 @@ private GridResourceInjector injectorByAnnotation(GridResourceIoc.ResourceAnnota res = new GridResourceJobContextInjector((ComputeJobContext)param); break; + case SESSION_CONTEXT_PROVIDER: + res = new GridResourceSessionContextProviderInjector((SessionContextProvider)param); + break; + default: res = injectorByAnnotation[ann.ordinal()]; break; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceSessionContextProviderInjector.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceSessionContextProviderInjector.java new file mode 100644 index 0000000000000..e7b1a48231a84 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceSessionContextProviderInjector.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.resource; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.session.SessionContextProvider; + +/** {@link SessionContextProvider} injector. */ +public class GridResourceSessionContextProviderInjector extends GridResourceBasicInjector { + /** */ + public GridResourceSessionContextProviderInjector(SessionContextProvider rsrc) { + super(rsrc); + } + + /** {@inheritDoc} */ + @Override public void inject(GridResourceField fld, Object target, Class depCls, GridDeployment dep) throws IgniteCheckedException { + super.inject(fld, target, depCls, dep); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/resources/SessionContextProviderResource.java b/modules/core/src/main/java/org/apache/ignite/resources/SessionContextProviderResource.java new file mode 100644 index 0000000000000..e5921794ef367 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/resources/SessionContextProviderResource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.resources; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.session.SessionContext; + +/** + * Annotates a field for injecting a {@link SessionContext} into a user defined functions. + *

+ * Here is how injection would typically happen: + *

+ *     public class MyFunctions {
+ *         @SessionContextProviderResource
+ *         public SessionContextProvider sesCtxProv;
+ *
+ *         @QuerySqlFunction
+ *         public String sessionId() {
+ *             SessionContext sesCtx = sesCtxProv.getSessionContext();
+ *
+ *             return sesCtx.getAttribute("SESSION_ID");
+ *         }
+ *     }
+ * 
+ *

+ * Note, only {@link QuerySqlFunction} in the Calcite engine is supported. + * + * @see SessionContext + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface SessionContextProviderResource { + // No-op. +} diff --git a/modules/core/src/main/java/org/apache/ignite/session/SessionContext.java b/modules/core/src/main/java/org/apache/ignite/session/SessionContext.java new file mode 100644 index 0000000000000..f91221f6045b2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/session/SessionContext.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.session; + +import org.apache.ignite.Ignite; +import org.apache.ignite.resources.SessionContextProviderResource; +import org.jetbrains.annotations.Nullable; + +/** + * Provides access to attributes set with {@link Ignite#withApplicationAttributes}. + * + * @see SessionContextProviderResource + */ +public interface SessionContext { + /** + * @param name Attribute name. + * @return Attribute value, or {@code null} if not speicifed. + .*/ + public @Nullable String getAttribute(String name); +} diff --git a/modules/core/src/main/java/org/apache/ignite/session/SessionContextProvider.java b/modules/core/src/main/java/org/apache/ignite/session/SessionContextProvider.java new file mode 100644 index 0000000000000..14c454790fce7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/session/SessionContextProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.session; + +import org.apache.ignite.resources.SessionContextProviderResource; + +/** + * Provides access to session context. + * + * @see SessionContextProviderResource + */ +public interface SessionContextProvider { + /** @return Session context, never {@code null}. */ + public SessionContext getSessionContext(); +} diff --git a/modules/core/src/test/java/org/apache/ignite/session/IgniteWithApplicationAttributesAwareTest.java b/modules/core/src/test/java/org/apache/ignite/session/IgniteWithApplicationAttributesAwareTest.java new file mode 100644 index 0000000000000..b4e72635933d5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/session/IgniteWithApplicationAttributesAwareTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.session; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** */ +public class IgniteWithApplicationAttributesAwareTest extends GridCommonAbstractTest { + /** */ + private static final String DEFAULT_CACHE_NAME0 = "default-0"; + + /** */ + private static final String DEFAULT_CACHE_CFG_NAME = "default-cfg"; + + /** */ + private static final String DEFAULT_CACHE_CFG_NAME0 = "default-cfg-0"; + + /** */ + @Test + public void testMultipleWithApplicationAttributes() throws Exception { + try (Ignite ign = startGrid()) { + Map attrs = F.asMap("0", "0"); + + Ignite appIgn = ign.withApplicationAttributes(attrs); + Ignite appIgn0 = appIgn.withApplicationAttributes(attrs); + + assertSame(appIgn, appIgn0); + + Ignite appIgn1 = appIgn.withApplicationAttributes(F.asMap("1", "1")); + + assertNotSame(appIgn, appIgn1); + } + } + + /** */ + @Test + public void testNonExistentCache() throws Exception { + try (Ignite ign = startGrid()) { + Ignite appIgn = ign.withApplicationAttributes(F.asMap("0", "0")); + + assertNull(appIgn.cache(DEFAULT_CACHE_NAME)); + } + } + + /** */ + @Test + public void testReturnSameCacheReference() throws Exception { + try (Ignite ign = startGrid()) { + Ignite appIgn = ign.withApplicationAttributes(F.asMap("0", "0")); + + Map> caches = new HashMap<>(); + + caches.computeIfAbsent(DEFAULT_CACHE_NAME, appIgn::createCache); + caches.computeIfAbsent(DEFAULT_CACHE_CFG_NAME, c -> appIgn.createCache(new CacheConfiguration<>(c))); + caches.computeIfAbsent(DEFAULT_CACHE_NAME0, appIgn::getOrCreateCache); + caches.computeIfAbsent(DEFAULT_CACHE_CFG_NAME0, c -> appIgn.getOrCreateCache(new CacheConfiguration<>(c))); + + for (Map.Entry> cache: caches.entrySet()) { + IgniteCache c0 = appIgn.cache(cache.getKey()); + IgniteCache c1 = appIgn.getOrCreateCache(cache.getKey()); + IgniteCache c2 = appIgn.getOrCreateCache(new CacheConfiguration<>(cache.getKey())); + IgniteCache c3 = appIgn.getOrCreateCaches(F.asList(new CacheConfiguration<>(cache.getKey()))) + .stream().findFirst().get(); + + assertSame(cache.getValue(), c0); + assertSame(cache.getValue(), c1); + assertSame(cache.getValue(), c2); + assertSame(cache.getValue(), c3); + } + } + } + + /** */ + @Test + public void testCreateAndGetMultipleCaches() throws Exception { + try (Ignite ign = startGrid()) { + Ignite appIgn = ign.withApplicationAttributes(F.asMap("0", "0")); + + List crtCaches = (List)appIgn.createCaches(F.asList( + new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME), + new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME0))); + + List getOrCrtCaches = (List)appIgn.getOrCreateCaches(F.asList( + new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME), + new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME0))); + + IgniteCache cache = appIgn.cache(DEFAULT_CACHE_CFG_NAME); + IgniteCache cache0 = appIgn.cache(DEFAULT_CACHE_CFG_NAME0); + + assertSame(crtCaches.get(0), getOrCrtCaches.get(0)); + assertSame(crtCaches.get(1), getOrCrtCaches.get(1)); + assertSame(crtCaches.get(0), cache); + assertSame(crtCaches.get(1), cache0); + } + } + + /** */ + @Test + public void testDestroyCacheCleansReference() throws Exception { + try (Ignite ign = startGrid()) { + Ignite appIgn = ign.withApplicationAttributes(F.asMap("0", "0")); + + appIgn.createCache(DEFAULT_CACHE_NAME); + appIgn.destroyCache(DEFAULT_CACHE_NAME); + + assertNull(appIgn.cache(DEFAULT_CACHE_NAME)); + + appIgn.createCaches(F.asList( + new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME), + new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME0))); + + appIgn.destroyCaches(F.asList(DEFAULT_CACHE_CFG_NAME, DEFAULT_CACHE_CFG_NAME0)); + + assertNull(appIgn.cache(DEFAULT_CACHE_CFG_NAME)); + assertNull(appIgn.cache(DEFAULT_CACHE_CFG_NAME0)); + } + } + + /** */ + @Test + public void testNearCacheReferences() throws Exception { + try (Ignite ign = startGrid(0); Ignite cln = startClientGrid(1)) { + Ignite appClnIgn = cln.withApplicationAttributes(F.asMap("0", "0")); + + // Check createCache. + IgniteCache cfgCache = appClnIgn.createCache( + new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME), new NearCacheConfiguration<>()); + + IgniteCache c0 = appClnIgn.cache(DEFAULT_CACHE_CFG_NAME); + IgniteCache c1 = appClnIgn.getOrCreateNearCache(DEFAULT_CACHE_CFG_NAME, new NearCacheConfiguration<>()); + + assertSame(cfgCache, c0); + assertSame(cfgCache, c1); + + // Check getOrCreateCache. + IgniteCache dftCache = appClnIgn.getOrCreateCache( + new CacheConfiguration<>(DEFAULT_CACHE_NAME), new NearCacheConfiguration<>()); + + IgniteCache c2 = appClnIgn.cache(DEFAULT_CACHE_NAME); + IgniteCache c3 = appClnIgn.getOrCreateNearCache(DEFAULT_CACHE_NAME, new NearCacheConfiguration<>()); + + assertSame(dftCache, c2); + assertSame(dftCache, c3); + + // Check createNearCache. + ign.createCache(new CacheConfiguration<>(DEFAULT_CACHE_CFG_NAME0)); + IgniteCache cache = appClnIgn.createNearCache(DEFAULT_CACHE_CFG_NAME0, new NearCacheConfiguration<>()); + + IgniteCache c4 = appClnIgn.cache(DEFAULT_CACHE_CFG_NAME0); + + assertSame(cache, c4); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index d71a41556fddb..f308f61a89e4e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -18,12 +18,14 @@ package org.apache.ignite.testframework.junits; import java.util.Collection; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import javax.cache.CacheException; import javax.management.MBeanServer; import org.apache.ignite.DataRegionMetrics; import org.apache.ignite.DataRegionMetricsAdapter; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; @@ -602,6 +604,11 @@ public IgniteMock( return NoopTracingConfigurationManager.INSTANCE; } + /** {@inheritDoc} */ + @Override public Ignite withApplicationAttributes(Map attrs) { + return null; + } + /** {@inheritDoc} */ @Override public Collection memoryMetrics() { return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics()); diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index 6a5760d97c051..d37200cdd591e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -855,6 +856,11 @@ public void kill() throws Exception { throw new UnsupportedOperationException("Operation isn't supported yet."); } + /** {@inheritDoc} */ + @Override public Ignite withApplicationAttributes(Map attrs) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + /** {@inheritDoc} */ @Override public void close() throws IgniteException { if (localJvmGrid() != null) { diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTaskSessionSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTaskSessionSelfTestSuite.java index 3412897a49bfe..39e3743d6d415 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTaskSessionSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTaskSessionSelfTestSuite.java @@ -36,6 +36,7 @@ import org.apache.ignite.session.GridSessionSetTaskAttributeSelfTest; import org.apache.ignite.session.GridSessionTaskWaitJobAttributeSelfTest; import org.apache.ignite.session.GridSessionWaitAttributeSelfTest; +import org.apache.ignite.session.IgniteWithApplicationAttributesAwareTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -62,7 +63,8 @@ GridSessionJobFailoverSelfTest.class, GridSessionLoadSelfTest.class, GridSessionCollisionSpiSelfTest.class, - GridSessionCheckpointSelfTest.class + GridSessionCheckpointSelfTest.class, + IgniteWithApplicationAttributesAwareTest.class }) public class IgniteTaskSessionSelfTestSuite { } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2SchemaManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2SchemaManager.java index e5e45f7299423..73f29291083f8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2SchemaManager.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2SchemaManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.sql.SQLException; import java.sql.Statement; import java.util.List; @@ -221,6 +222,12 @@ private void dropTable(H2TableDescriptor tbl) { /** {@inheritDoc} */ @Override public void onFunctionCreated(String schema, String name, boolean deterministic, Method method) { + if (!Modifier.isStatic(method.getModifiers())) { + log.warning("Skip creating SQL function '" + name + "' in H2 engine because it is not static."); + + return; + } + try { createSqlFunction(schema, name, deterministic, method.getDeclaringClass().getName() + '.' + method.getName()); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java index 1cfd895fac456..197f12b6e449a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java @@ -559,7 +559,7 @@ public static CacheOperationContext setKeepBinaryContext(GridCacheContext if (opCtx == null) // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary - newOpCtx = new CacheOperationContext(false, true, null, false, null, false, null); + newOpCtx = new CacheOperationContext(false, true, null, false, null, false, null, null); else if (!opCtx.isKeepBinary()) newOpCtx = opCtx.keepBinary(); diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index 349e77227825f..07934a8dc164a 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -22,6 +22,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterGroup; @@ -332,6 +333,13 @@ public ApplicationContext getApplicationContext() throws BeansException { return g.tracingConfiguration(); } + /** {@inheritDoc} */ + @Override public Ignite withApplicationAttributes(Map attrs) { + checkIgnite(); + + return g.withApplicationAttributes(attrs); + } + /** {@inheritDoc} */ @Override public Collection memoryMetrics() { return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics()); diff --git a/parent/pom.xml b/parent/pom.xml index 82b8040bbcf02..4bbffbe7b6db7 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -393,6 +393,10 @@ Development Utils org.apache.ignite.development.utils* + + SessionContext API + org.apache.ignite.session +