Skip to content

Commit

Permalink
Sent appAttrs only once
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim committed Nov 14, 2024
1 parent 4995763 commit 0f93a5c
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.SessionContext;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
Expand Down Expand Up @@ -649,6 +650,10 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
qry.onResponse(nodeId, fragment.fragmentId(), ex);
else {
try {
SessionContext sesCtx = qry.context().unwrap(SessionContext.class);

Map<String, String> sesAttrs = sesCtx == null ? null : sesCtx.getAttributes();

QueryStartRequest req = new QueryStartRequest(
qry.id(),
qry.localQueryId(),
Expand All @@ -659,7 +664,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
fragmentsPerNode.get(nodeId).intValue(),
qry.parameters(),
parametersMarshalled,
timeout
timeout,
sesAttrs
);

messageService().send(nodeId, req);
Expand Down Expand Up @@ -847,7 +853,7 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
);

final BaseQueryContext qctx = createQueryContext(
Contexts.of(ctx.resource(), ctx.sessionContext().context()),
Contexts.of(ctx.resource(), ctx.sessionContext().context(msg.appAttrs())),
msg.schema());

QueryPlan qryPlan = queryPlanCache().queryPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -64,6 +66,9 @@ public class QueryStartRequest implements MarshalableMessage, ExecutionContextAw
/** */
private long timeout;

/** */
private Map<String, String> appAttrs;

/** */
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public QueryStartRequest(
Expand All @@ -76,7 +81,8 @@ public QueryStartRequest(
int totalFragmentsCnt,
Object[] params,
@Nullable byte[] paramsBytes,
long timeout
long timeout,
@Nullable Map<String, String> appAttrs
) {
this.qryId = qryId;
this.originatingQryId = originatingQryId;
Expand All @@ -88,6 +94,7 @@ public QueryStartRequest(
this.params = params;
this.paramsBytes = paramsBytes; // If we already have marshalled params, use it.
this.timeout = timeout;
this.appAttrs = appAttrs;
}

/** */
Expand Down Expand Up @@ -168,6 +175,11 @@ public long timeout() {
return timeout;
}

/** */
public Map<String, String> appAttrs() {
return appAttrs;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (paramsBytes == null && params != null)
Expand Down Expand Up @@ -250,6 +262,11 @@ public long timeout() {

writer.incrementState();

case 9:
if (!writer.writeMap("appAttrs", appAttrs, MessageCollectionItemType.STRING, MessageCollectionItemType.STRING))
return false;

writer.incrementState();
}

return true;
Expand Down Expand Up @@ -335,6 +352,14 @@ public long timeout() {

reader.incrementState();

case 9:
appAttrs = reader.readMap("appAttrs", MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false);

if (!reader.isLastRead())
return false;

reader.incrementState();

}

return reader.afterMessageRead(QueryStartRequest.class);
Expand All @@ -347,6 +372,6 @@ public long timeout() {

/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 9;
return 10;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.cache;

import java.util.Map;
import org.apache.ignite.Ignite;
import org.jetbrains.annotations.Nullable;

Expand All @@ -29,4 +30,7 @@ public interface SessionContext {
* @return Attribute value, or {@code null} if not speicifed.
.*/
public @Nullable String getAttribute(String name);

/** */
public Map<String, String> getAttributes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public AutoCloseable withContext(@Nullable Map<String, String> sesAttrs) {
return sesCtx;
}

/** */
public SessionContext context(@Nullable Map<String, String> sesAttrs) {
return new SessionContextImpl(sesAttrs);
}

/** @return Session context for current thread. */
public @Nullable SessionContext context() {
return ctx.get();
Expand Down Expand Up @@ -82,9 +87,35 @@ public SessionContextCloseable(Map<String, String> attrs) {
return attrs.get(name);
}

/** */
@Override public Map<String, String> getAttributes() {
return attrs;
}

/** Clears thread local session context. */
@Override public void close() {
ctx.remove();
}
}

/** */
private class SessionContextImpl implements SessionContext {
/** Session attributes. */
private final Map<String, String> attrs;

/** @param attrs Session attributes. */
public SessionContextImpl(Map<String, String> attrs) {
this.attrs = new HashMap<>(attrs);
}

/** {@inheritDoc} */
@Override public @Nullable String getAttribute(String name) {
return attrs.get(name);
}

/** */
@Override public Map<String, String> getAttributes() {
return attrs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,7 @@ private void processP2PMessage(

assert obj != null;

invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg), sessionAttributes(msg));
invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg));
}
finally {
threadProcessingMessage(false, null);
Expand Down Expand Up @@ -1511,7 +1511,7 @@ private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {

assert obj != null;

invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg), sessionAttributes(msg));
invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg));
}

/**
Expand Down Expand Up @@ -1876,15 +1876,13 @@ private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageLis
* @param nodeId Node ID.
* @param msg Message.
* @param secSubjId Security subject that will be used to open a security session.
* @param sesAttrs Session attributes.
*/
private void invokeListener(
Byte plc,
GridMessageListener lsnr,
UUID nodeId,
Object msg,
UUID secSubjId,
Map<String, String> sesAttrs
UUID secSubjId
) {
MTC.span().addLog(() -> "Invoke listener");

Expand All @@ -1899,7 +1897,6 @@ private void invokeListener(

try (
OperationSecurityContext ignored = ctx.security().withContext(newSecSubjId);
AutoCloseable ignored0 = ctx.sessionContext().withContext(sesAttrs)
) {
lsnr.onMessage(nodeId, msg, plc);
}
Expand Down Expand Up @@ -2137,17 +2134,15 @@ private long getInverseConnectionWaitTimeout() {
long timeout,
boolean skipOnTimeout
) {
Map<String, String> sesAttrs = ctx.sessionContext().attributes();

boolean secEnabled = ctx.security().enabled();

if (secEnabled || sesAttrs != null) {
if (secEnabled) {
UUID secSubjId = null;

if (secEnabled && !ctx.security().isDefaultContext())
secSubjId = ctx.security().securityContext().subject().id();

return new GridIoSecurityAwareMessage(secSubjId, sesAttrs, plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
return new GridIoSecurityAwareMessage(secSubjId, plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
}

return new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
Expand Down Expand Up @@ -3924,7 +3919,7 @@ void unwind(GridMessageListener lsnr) {

MTC.span().addTag(SpanTags.MESSAGE, () -> traceName(fmc.message));

invokeListener(plc, lsnr, nodeId, mc.message.message(), secSubjId(mc.message), sessionAttributes(mc.message));
invokeListener(plc, lsnr, nodeId, mc.message.message(), secSubjId(mc.message));
}
finally {
if (mc.closure != null)
Expand Down Expand Up @@ -4366,16 +4361,6 @@ private UUID secSubjId(GridIoMessage msg) {
return null;
}

/**
* @return Session attributes.
*/
private Map<String, String> sessionAttributes(GridIoMessage msg) {
if (msg instanceof GridIoSecurityAwareMessage)
return ((GridIoSecurityAwareMessage)msg).sessionAttributes();

return null;
}

/**
* Responsible for handling network situation where server cannot open connection to client and
* has to ask client to establish a connection to specific server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
Expand All @@ -39,10 +36,7 @@ public class GridIoSecurityAwareMessage extends GridIoMessage {
public static final short TYPE_CODE = 174;

/** Security subject id that will be used during message processing on an remote node. */
private @Nullable UUID secSubjId;

/** Session attributes. */
private @Nullable Map<String, String> sesAttrs;
private UUID secSubjId;

/**
* No-op constructor to support {@link Externalizable} interface.
Expand All @@ -54,7 +48,6 @@ public GridIoSecurityAwareMessage() {

/**
* @param secSubjId Security subject id.
* @param sesAttrs Session attributes.
* @param plc Policy.
* @param topic Communication topic.
* @param topicOrd Topic ordinal value.
Expand All @@ -64,8 +57,7 @@ public GridIoSecurityAwareMessage() {
* @param skipOnTimeout Whether message can be skipped on timeout.
*/
public GridIoSecurityAwareMessage(
@Nullable UUID secSubjId,
@Nullable Map<String, String> sesAttrs,
UUID secSubjId,
byte plc,
Object topic,
int topicOrd,
Expand All @@ -77,7 +69,6 @@ public GridIoSecurityAwareMessage(
super(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);

this.secSubjId = secSubjId;
this.sesAttrs = sesAttrs;
}

/**
Expand All @@ -87,13 +78,6 @@ public GridIoSecurityAwareMessage(
return secSubjId;
}

/**
* @return Session attributes.
*/
@Nullable Map<String, String> sessionAttributes() {
return sesAttrs == null ? null : Collections.unmodifiableMap(sesAttrs);
}

/** {@inheritDoc} */
@Override public short directType() {
return TYPE_CODE;
Expand Down Expand Up @@ -125,11 +109,6 @@ public GridIoSecurityAwareMessage(

writer.incrementState();

case 9:
if (!writer.writeMap("sesAttrs", sesAttrs, MessageCollectionItemType.STRING, MessageCollectionItemType.STRING))
return false;

writer.incrementState();
}

return true;
Expand All @@ -149,14 +128,6 @@ public GridIoSecurityAwareMessage(
case 8:
secSubjId = reader.readUuid("secSubjId");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 9:
sesAttrs = reader.readMap("sesAttrs", MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false);

if (!reader.isLastRead())
return false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public void testThrowIllegalStateExceptionIfNodeNotFoundInDiscoCache() throws Ex

spi.sendMessage(srv.localNode(), new GridIoSecurityAwareMessage(
UUID.randomUUID(),
null,
PUBLIC_POOL,
TOPIC_CACHE,
TOPIC_CACHE.ordinal(),
Expand Down

0 comments on commit 0f93a5c

Please sign in to comment.