Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim committed Nov 16, 2024
1 parent 9b0c2c8 commit fc2c340
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cache.context.SessionContextImpl;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
Expand Down Expand Up @@ -853,7 +854,7 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
);

final BaseQueryContext qctx = createQueryContext(
Contexts.of(ctx.resource(), ctx.sessionContext().context(msg.appAttrs())),
Contexts.of(ctx.resource(), msg.appAttrs() == null ? null : new SessionContextImpl(msg.appAttrs())),
msg.schema());

QueryPlan qryPlan = queryPlanCache().queryPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void exceptionHandler(Thread.UncaughtExceptionHandler eHnd) {

IgniteStripedThreadPoolExecutor executor = new SecurityAwareStripedThreadPoolExecutor(
ctx.security(),
ctx.sessionContext(),
ctx.config().getQueryThreadPoolSize(),
ctx.igniteInstanceName(),
"calciteQry",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.Executor;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.cache.context.SessionContextProcessor;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
Expand Down Expand Up @@ -646,9 +645,4 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return Executor that is in charge of processing user async continuations.
*/
public Executor getAsyncContinuationExecutor();

/**
* @return Session context processor.
*/
public SessionContextProcessor sessionContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.cache.context.SessionContextProcessor;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.maintenance.MaintenanceProcessor;
Expand Down Expand Up @@ -176,10 +175,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@GridToStringExclude
private IndexProcessor indexProc;

/** */
@GridToStringExclude
private SessionContextProcessor sesCtxProc;

/** */
@GridToStringExclude
private GridEncryptionManager encryptionMgr;
Expand Down Expand Up @@ -594,8 +589,6 @@ else if (comp instanceof PerformanceStatisticsProcessor)
perfStatProc = (PerformanceStatisticsProcessor)comp;
else if (comp instanceof IndexProcessor)
indexProc = (IndexProcessor)comp;
else if (comp instanceof SessionContextProcessor)
sesCtxProc = (SessionContextProcessor)comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor
|| comp instanceof QueryEngine))
Expand Down Expand Up @@ -1109,9 +1102,4 @@ public void recoveryMode(boolean recoveryMode) {
? ForkJoinPool.commonPool()
: config().getAsyncContinuationExecutor();
}

/** {@inheritDoc} */
@Override public SessionContextProcessor sessionContext() {
return sesCtxProc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.apache.ignite.internal.binary.BinaryEnumCache;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cache.context.SessionContextProcessor;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
Expand Down Expand Up @@ -1105,7 +1104,6 @@ public void start(
startProcessor(new DistributedMetaStorageImpl(ctx));
startProcessor(new DistributedConfigurationProcessor(ctx));
startProcessor(new DurableBackgroundTasksProcessor(ctx));
startProcessor(new SessionContextProcessor(ctx));

CacheObjectTransformerProcessor transProc = createComponent(CacheObjectTransformerProcessor.class, ctx);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.cache.context;

import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.cache.SessionContext;
import org.jetbrains.annotations.Nullable;

/** */
public class SessionContextImpl implements SessionContext {
/** Session attributes. */
private final Map<String, String> attrs;

/** @param attrs Session attributes. */
public SessionContextImpl(Map<String, String> attrs) {
this.attrs = new HashMap<>(attrs);
}

/** {@inheritDoc} */
@Override public @Nullable String getAttribute(String name) {
return attrs.get(name);
}

/** */
@Override public Map<String, String> getAttributes() {
return attrs;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1339,11 +1339,6 @@ public void operationContextPerCall(@Nullable CacheOperationContext opCtx) {
dht().near().context().opCtxPerCall.set(opCtx);
else
opCtxPerCall.set(opCtx);

if (opCtx == null)
ctx.sessionContext().clear();
else
ctx.sessionContext().withContext(opCtx.applicationAttributes());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.ignite.internal.LongJVMPauseDetector;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.cache.context.SessionContextProcessor;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
Expand Down Expand Up @@ -735,11 +734,6 @@ private void setField(IgniteEx kernal, String name, Object val) throws NoSuchFie
return null;
}

/** {@inheritDoc} */
@Override public SessionContextProcessor sessionContext() {
return null;
}

/**
* @param kctx Kernal context.
* @throws IgniteCheckedException In case of any error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,6 @@ private IgniteStripedThreadPoolExecutor createStripedThreadPoolExecutor(
return ctx.security().enabled()
? new SecurityAwareStripedThreadPoolExecutor(
ctx.security(),
ctx.sessionContext(),
concurrentLvl,
igniteInstanceName,
threadNamePrefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.SessionContext;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
Expand All @@ -66,6 +67,7 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.cache.context.SessionContextImpl;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.query.index.IndexQueryProcessor;
import org.apache.ignite.internal.cache.query.index.IndexQueryResult;
Expand All @@ -75,6 +77,7 @@
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
Expand Down Expand Up @@ -3067,8 +3070,17 @@ public List<FieldsQueryCursor<List<?>>> querySqlFields(
);
}
else {
SessionContext sesCtx = null;

if (cctx != null) {
CacheOperationContext opCtx = cctx.operationContextPerCall();;

if (opCtx != null && opCtx.applicationAttributes() != null)
sesCtx = new SessionContextImpl(opCtx.applicationAttributes());
}

res = qryEngine.query(
QueryContext.of(qry, cliCtx, cancel, qryProps, ctx.resource(), ctx.sessionContext().context()),
QueryContext.of(qry, cliCtx, cancel, qryProps, ctx.resource(), sesCtx),
schemaName,
qry.getSql(),
qry.getArgs() != null ? qry.getArgs() : X.EMPTY_OBJECT_ARRAY
Expand Down
Loading

0 comments on commit fc2c340

Please sign in to comment.