Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23392 Add SessionContext API #11618

Open
wants to merge 29 commits into
base: application_context
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.running.TrackableQuery;
import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.session.SessionContext;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -159,8 +161,19 @@ public RootQuery(
* @param schema new schema.
*/
public RootQuery<RowT> childQuery(SchemaPlus schema) {
return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), ctx.isLocal(), ctx.isForcedJoinOrder(),
ctx.partitions(), exch, unregister, log, plannerTimeout, totalTimeout);
return new RootQuery<>(
sql,
schema,
params,
QueryContext.of(cancel, ctx.unwrap(GridResourceProcessor.class), ctx.unwrap(SessionContext.class)),
ctx.isLocal(),
ctx.isForcedJoinOrder(),
ctx.partitions(),
exch,
unregister,
log,
plannerTimeout,
totalTimeout);
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -46,6 +47,7 @@
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ReflectiveCallNotNullImplementor;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
Expand All @@ -59,10 +61,13 @@
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.session.SessionContext;
import org.apache.ignite.session.SessionContextProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -78,6 +83,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** Placeholder for NULL values in search bounds. */
private static final Object NULL_BOUND = new Object();

/** Emtpy session context. */
private static final SessionContext EMPTY_SESSION_CONTEXT = (attrName) -> null;

/** */
private final UUID qryId;

Expand Down Expand Up @@ -126,6 +134,12 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** */
private final long startTs;

/** Map associates UDF name to instance of class that contains this UDF. */
private final Map<String, Object> udfObjs = new ConcurrentHashMap<>();

/** Session context provider injected into UDF targets. */
private final SessionContextProvider sesCtxProv = new SessionContextProviderImpl();

/** */
private Object[] correlations = new Object[16];

Expand Down Expand Up @@ -457,6 +471,31 @@ public IoTracker ioTracker() {
return ioTracker;
}

/**
* Return an object contained a user defined function. If not exist yet, then instantiate the object and inject resources into it.
* Used by {@link ReflectiveCallNotNullImplementor} while it is preparing user function call.
*
* @param udfClsName Classname of the class contained UDF.
* @return Object with injected resources.
*/
public Object udfObject(String udfClsName) {
return udfObjs.computeIfAbsent(udfClsName, ignore -> {
try {
Class<?> funcCls = getClass().getClassLoader().loadClass(udfClsName);

Object target = funcCls.getConstructor().newInstance();

unwrap(GridResourceProcessor.class).injectToUdf(target, sesCtxProv);

return target;
}
catch (Exception e) {
throw new IgniteException("Failed to instantiate an object for UDF. " +
"Class " + udfClsName + " must have public zero-args constructor.", e);
}
});
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
Expand All @@ -473,4 +512,14 @@ public IoTracker ioTracker() {
@Override public int hashCode() {
return Objects.hash(qryId, fragmentDesc.fragmentId());
}

/** */
private class SessionContextProviderImpl implements SessionContextProvider {
/** {@inheritDoc} */
@Override public @Nullable SessionContext getSessionContext() {
SessionContext ctx = unwrap(SessionContext.class);

return ctx == null ? EMPTY_SESSION_CONTEXT : ctx;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cache.context.SessionContextImpl;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
Expand Down Expand Up @@ -653,6 +654,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
qry.onResponse(nodeId, fragment.fragmentId(), ex);
else {
try {
SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class);

QueryStartRequest req = new QueryStartRequest(
qry.id(),
qry.localQueryId(),
Expand All @@ -664,7 +667,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
qry.parameters(),
parametersMarshalled,
timeout,
ectx.getQryTxEntries()
ectx.getQryTxEntries(),
sesCtx == null ? null : sesCtx.attributes()
);

messageService().send(nodeId, req);
Expand Down Expand Up @@ -851,7 +855,9 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
)
);

final BaseQueryContext qctx = createQueryContext(Contexts.empty(), msg.schema());
final BaseQueryContext qctx = createQueryContext(
Contexts.of(ctx.resource(), msg.appAttrs() == null ? null : new SessionContextImpl(msg.appAttrs())),
msg.schema());

QueryPlan qryPlan = queryPlanCache().queryPlan(
new CacheKey(msg.schema(), msg.root()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.exec.exp;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
Expand Down Expand Up @@ -47,8 +46,6 @@ private IgniteScalarFunction(Method method, CallImplementor implementor) {
* @return Created {@link ScalarFunction}.
*/
public static ScalarFunction create(Method method) {
assert Modifier.isStatic(method.getModifiers());

CallImplementor implementor = RexImpTable.createImplementor(
new ReflectiveCallNotNullImplementor(method), NullPolicy.NONE, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.rex.RexCall;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;

/**
* Implementation of {@link NotNullImplementor} that calls a given {@link Method}.
Expand Down Expand Up @@ -55,10 +57,13 @@ public ReflectiveCallNotNullImplementor(Method method) {
callExpr = Expressions.call(method, translatedOperands);

else {
// The UDF class must have a public zero-args constructor.
// Assume that the validator checked already.
final Expression target =
Expressions.new_(method.getDeclaringClass());
final Expression target = Expressions.convert_(
Expressions.call(
translator.getRoot(),
Types.lookupMethod(ExecutionContext.class, "udfObject", String.class),
Expressions.constant(method.getDeclaringClass().getName())),
method.getDeclaringClass());

callExpr = Expressions.call(target, method, translatedOperands);
}
if (!containsCheckedException(method))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
Expand Down Expand Up @@ -71,6 +72,9 @@ public class QueryStartRequest implements MarshalableMessage, ExecutionContextAw
@GridDirectCollection(QueryTxEntry.class)
private @Nullable Collection<QueryTxEntry> qryTxEntries;

/** */
private Map<String, String> appAttrs;

/** */
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public QueryStartRequest(
Expand All @@ -84,7 +88,8 @@ public QueryStartRequest(
Object[] params,
@Nullable byte[] paramsBytes,
long timeout,
Collection<QueryTxEntry> qryTxEntries
Collection<QueryTxEntry> qryTxEntries,
@Nullable Map<String, String> appAttrs
) {
this.qryId = qryId;
this.originatingQryId = originatingQryId;
Expand All @@ -97,6 +102,7 @@ public QueryStartRequest(
this.paramsBytes = paramsBytes; // If we already have marshalled params, use it.
this.timeout = timeout;
this.qryTxEntries = qryTxEntries;
this.appAttrs = appAttrs;
}

/** */
Expand Down Expand Up @@ -184,6 +190,11 @@ public long timeout() {
return qryTxEntries;
}

/** */
public Map<String, String> appAttrs() {
return appAttrs;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (paramsBytes == null && params != null)
Expand Down Expand Up @@ -284,6 +295,11 @@ public long timeout() {

writer.incrementState();

case 10:
if (!writer.writeMap("appAttrs", appAttrs, MessageCollectionItemType.STRING, MessageCollectionItemType.STRING))
return false;

writer.incrementState();
}

return true;
Expand Down Expand Up @@ -377,6 +393,14 @@ public long timeout() {

reader.incrementState();

case 10:
appAttrs = reader.readMap("appAttrs", MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false);

if (!reader.isLastRead())
return false;

reader.incrementState();

}

return reader.afterMessageRead(QueryStartRequest.class);
Expand All @@ -389,6 +413,6 @@ public long timeout() {

/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 10;
return 11;
}
}
Loading