From fc2c3401ba54c1f85acf33145b9d82eaf93150e0 Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Sat, 16 Nov 2024 20:17:45 +0300 Subject: [PATCH] WIP --- .../calcite/exec/ExecutionServiceImpl.java | 3 +- .../calcite/exec/QueryTaskExecutorImpl.java | 1 - .../ignite/internal/GridKernalContext.java | 6 - .../internal/GridKernalContextImpl.java | 12 -- .../apache/ignite/internal/IgniteKernal.java | 2 - .../cache/context/SessionContextImpl.java | 44 +++++++ .../context/SessionContextProcessor.java | 121 ------------------ .../processors/cache/GridCacheContext.java | 5 - .../reader/StandaloneGridKernalContext.java | 6 - .../processors/pool/PoolProcessor.java | 1 - .../processors/query/GridQueryProcessor.java | 14 +- .../thread/SecurityAwareRunnable.java | 32 +---- ...ecurityAwareStripedThreadPoolExecutor.java | 10 +- 13 files changed, 65 insertions(+), 192 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextImpl.java delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextProcessor.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 5d0e7bd45dfad..e561db92496a4 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -41,6 +41,7 @@ import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.cache.context.SessionContextImpl; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -853,7 +854,7 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) { ); final BaseQueryContext qctx = createQueryContext( - Contexts.of(ctx.resource(), ctx.sessionContext().context(msg.appAttrs())), + Contexts.of(ctx.resource(), msg.appAttrs() == null ? null : new SessionContextImpl(msg.appAttrs())), msg.schema()); QueryPlan qryPlan = queryPlanCache().queryPlan( diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java index e3c0933f6aa6f..380d4ff7a9d96 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java @@ -87,7 +87,6 @@ public void exceptionHandler(Thread.UncaughtExceptionHandler eHnd) { IgniteStripedThreadPoolExecutor executor = new SecurityAwareStripedThreadPoolExecutor( ctx.security(), - ctx.sessionContext(), ctx.config().getQueryThreadPoolSize(), ctx.igniteInstanceName(), "calciteQry", diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 4161ec1bd7107..f48693a0459c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -23,7 +23,6 @@ import java.util.concurrent.Executor; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.cache.context.SessionContextProcessor; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; @@ -646,9 +645,4 @@ public interface GridKernalContext extends Iterable { * @return Executor that is in charge of processing user async continuations. */ public Executor getAsyncContinuationExecutor(); - - /** - * @return Session context processor. - */ - public SessionContextProcessor sessionContext(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 6a40a920f4002..49d7c5204c6d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -38,7 +38,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.FailureType; -import org.apache.ignite.internal.cache.context.SessionContextProcessor; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; import org.apache.ignite.internal.maintenance.MaintenanceProcessor; @@ -176,10 +175,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude private IndexProcessor indexProc; - /** */ - @GridToStringExclude - private SessionContextProcessor sesCtxProc; - /** */ @GridToStringExclude private GridEncryptionManager encryptionMgr; @@ -594,8 +589,6 @@ else if (comp instanceof PerformanceStatisticsProcessor) perfStatProc = (PerformanceStatisticsProcessor)comp; else if (comp instanceof IndexProcessor) indexProc = (IndexProcessor)comp; - else if (comp instanceof SessionContextProcessor) - sesCtxProc = (SessionContextProcessor)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor || comp instanceof PlatformPluginProcessor || comp instanceof QueryEngine)) @@ -1109,9 +1102,4 @@ public void recoveryMode(boolean recoveryMode) { ? ForkJoinPool.commonPool() : config().getAsyncContinuationExecutor(); } - - /** {@inheritDoc} */ - @Override public SessionContextProcessor sessionContext() { - return sesCtxProc; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index f825b3df81540..2afbbd235cad8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -90,7 +90,6 @@ import org.apache.ignite.internal.binary.BinaryEnumCache; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryUtils; -import org.apache.ignite.internal.cache.context.SessionContextProcessor; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; @@ -1105,7 +1104,6 @@ public void start( startProcessor(new DistributedMetaStorageImpl(ctx)); startProcessor(new DistributedConfigurationProcessor(ctx)); startProcessor(new DurableBackgroundTasksProcessor(ctx)); - startProcessor(new SessionContextProcessor(ctx)); CacheObjectTransformerProcessor transProc = createComponent(CacheObjectTransformerProcessor.class, ctx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextImpl.java new file mode 100644 index 0000000000000..a865ad884a6f9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextImpl.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cache.context; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.cache.SessionContext; +import org.jetbrains.annotations.Nullable; + +/** */ +public class SessionContextImpl implements SessionContext { + /** Session attributes. */ + private final Map attrs; + + /** @param attrs Session attributes. */ + public SessionContextImpl(Map attrs) { + this.attrs = new HashMap<>(attrs); + } + + /** {@inheritDoc} */ + @Override public @Nullable String getAttribute(String name) { + return attrs.get(name); + } + + /** */ + @Override public Map getAttributes() { + return attrs; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextProcessor.java deleted file mode 100644 index 41a5b28ef6822..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/cache/context/SessionContextProcessor.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.cache.context; - -import java.util.HashMap; -import java.util.Map; -import org.apache.ignite.cache.SessionContext; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.jetbrains.annotations.Nullable; - -/** Processor for handling session context. */ -public class SessionContextProcessor extends GridProcessorAdapter { - /** Holds session context for current thread. */ - private final ThreadLocal ctx = new ThreadLocal<>(); - - /** */ - public SessionContextProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** - * Set context for current thread. - * - * @param sesAttrs Session attributes to set. - */ - public AutoCloseable withContext(@Nullable Map sesAttrs) { - if (sesAttrs == null) - return null; - - SessionContextCloseable sesCtx = new SessionContextCloseable(sesAttrs); - - ctx.set(sesCtx); - - return sesCtx; - } - - /** */ - public SessionContext context(@Nullable Map sesAttrs) { - return new SessionContextImpl(sesAttrs); - } - - /** @return Session context for current thread. */ - public @Nullable SessionContext context() { - return ctx.get(); - } - - /** Clears session context for current thread. */ - public void clear() { - ctx.remove(); - } - - /** @return Session attributes for current context. */ - public @Nullable Map attributes() { - SessionContextCloseable ses = (SessionContextCloseable)ctx.get(); - - return ses == null ? null : ses.attrs; - } - - /** */ - private class SessionContextCloseable implements SessionContext, AutoCloseable { - /** Session attributes. */ - private final Map attrs; - - /** @param attrs Session attributes. */ - public SessionContextCloseable(Map attrs) { - this.attrs = new HashMap<>(attrs); - } - - /** {@inheritDoc} */ - @Override public @Nullable String getAttribute(String name) { - return attrs.get(name); - } - - /** */ - @Override public Map getAttributes() { - return attrs; - } - - /** Clears thread local session context. */ - @Override public void close() { - ctx.remove(); - } - } - - /** */ - private class SessionContextImpl implements SessionContext { - /** Session attributes. */ - private final Map attrs; - - /** @param attrs Session attributes. */ - public SessionContextImpl(Map attrs) { - this.attrs = new HashMap<>(attrs); - } - - /** {@inheritDoc} */ - @Override public @Nullable String getAttribute(String name) { - return attrs.get(name); - } - - /** */ - @Override public Map getAttributes() { - return attrs; - } - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index d404c424ff0d6..dc5b8a7b2cf2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1339,11 +1339,6 @@ public void operationContextPerCall(@Nullable CacheOperationContext opCtx) { dht().near().context().opCtxPerCall.set(opCtx); else opCtxPerCall.set(opCtx); - - if (opCtx == null) - ctx.sessionContext().clear(); - else - ctx.sessionContext().withContext(opCtx.applicationAttributes()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index c98d3e0992608..a2da615224c43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -40,7 +40,6 @@ import org.apache.ignite.internal.LongJVMPauseDetector; import org.apache.ignite.internal.MarshallerContextImpl; import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.cache.context.SessionContextProcessor; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; @@ -735,11 +734,6 @@ private void setField(IgniteEx kernal, String name, Object val) throws NoSuchFie return null; } - /** {@inheritDoc} */ - @Override public SessionContextProcessor sessionContext() { - return null; - } - /** * @param kctx Kernal context. * @throws IgniteCheckedException In case of any error. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java index ae384dfe17ebc..704067ca228dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -1101,7 +1101,6 @@ private IgniteStripedThreadPoolExecutor createStripedThreadPoolExecutor( return ctx.security().enabled() ? new SecurityAwareStripedThreadPoolExecutor( ctx.security(), - ctx.sessionContext(), concurrentLvl, igniteInstanceName, threadNamePrefix, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index ec902b962822f..09446767adce6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -53,6 +53,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.SessionContext; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.binary.BinaryMetadata; +import org.apache.ignite.internal.cache.context.SessionContextImpl; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.query.index.IndexQueryProcessor; import org.apache.ignite.internal.cache.query.index.IndexQueryResult; @@ -75,6 +77,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; @@ -3067,8 +3070,17 @@ public List>> querySqlFields( ); } else { + SessionContext sesCtx = null; + + if (cctx != null) { + CacheOperationContext opCtx = cctx.operationContextPerCall();; + + if (opCtx != null && opCtx.applicationAttributes() != null) + sesCtx = new SessionContextImpl(opCtx.applicationAttributes()); + } + res = qryEngine.query( - QueryContext.of(qry, cliCtx, cancel, qryProps, ctx.resource(), ctx.sessionContext().context()), + QueryContext.of(qry, cliCtx, cancel, qryProps, ctx.resource(), sesCtx), schemaName, qry.getSql(), qry.getArgs() != null ? qry.getArgs() : X.EMPTY_OBJECT_ARRAY diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java index eab9b32529d3f..011d53cc33a82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java @@ -17,13 +17,9 @@ package org.apache.ignite.internal.processors.security.thread; -import java.util.Map; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.cache.context.SessionContextProcessor; import org.apache.ignite.internal.processors.security.IgniteSecurity; import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; -import org.jetbrains.annotations.Nullable; /** * Represents a {@link Runnable} wrapper that executes the original {@link Runnable} with the security context @@ -36,51 +32,31 @@ class SecurityAwareRunnable implements Runnable { /** */ private final IgniteSecurity security; - /** */ - private final @Nullable SessionContextProcessor sesCtxProc; - /** */ private final SecurityContext secCtx; /** */ - private final @Nullable Map sesAttrs; - - /** */ - private SecurityAwareRunnable(IgniteSecurity security, @Nullable SessionContextProcessor sesCtxProc, Runnable delegate) { + private SecurityAwareRunnable(IgniteSecurity security, Runnable delegate) { assert delegate != null; this.delegate = delegate; this.security = security; - this.sesCtxProc = sesCtxProc; secCtx = security.securityContext(); - - sesAttrs = sesCtxProc == null ? null : sesCtxProc.attributes(); } /** {@inheritDoc} */ @Override public void run() { - try ( - OperationSecurityContext ignored = security.withContext(secCtx); - AutoCloseable ignored0 = sesCtxProc == null ? null : sesCtxProc.withContext(sesAttrs) - ) { + try (OperationSecurityContext ignored = security.withContext(secCtx)) { delegate.run(); } - catch (Exception e) { - throw new IgniteException("Failed to close SessionContext", e); - } } /** */ static Runnable of(IgniteSecurity security, Runnable delegate) { - return of(security, null, delegate); - } - - /** */ - static Runnable of(IgniteSecurity security, SessionContextProcessor sesCtxProc, Runnable delegate) { - if (delegate == null || (security.isDefaultContext() && sesCtxProc != null && sesCtxProc.context() == null)) + if (delegate == null || (security.isDefaultContext())) return delegate; - return new SecurityAwareRunnable(security, sesCtxProc, delegate); + return new SecurityAwareRunnable(security, delegate); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java index d681f34828836..3eadf4d6baa86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.security.thread; -import org.apache.ignite.internal.cache.context.SessionContextProcessor; import org.apache.ignite.internal.processors.security.IgniteSecurity; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; @@ -29,14 +28,10 @@ public class SecurityAwareStripedThreadPoolExecutor extends IgniteStripedThreadP /** */ private final IgniteSecurity security; - /** */ - private final SessionContextProcessor sesCtx; - /** */ public SecurityAwareStripedThreadPoolExecutor( IgniteSecurity security, - SessionContextProcessor sesCtx, - int concurrentLvl, + int concurrentLvl, String igniteInstanceName, String threadNamePrefix, Thread.UncaughtExceptionHandler eHnd, @@ -44,12 +39,11 @@ public SecurityAwareStripedThreadPoolExecutor( long keepAliveTime ) { super(concurrentLvl, igniteInstanceName, threadNamePrefix, eHnd, allowCoreThreadTimeOut, keepAliveTime); - this.sesCtx = sesCtx; this.security = security; } /** {@inheritDoc} */ @Override public void execute(Runnable task, int idx) { - super.execute(SecurityAwareRunnable.of(security, sesCtx, task), idx); + super.execute(SecurityAwareRunnable.of(security, task), idx); } }