Skip to content

Passing of authenticated user in query lifecycle callbacks in GraphManager. #3137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 3.7-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.server;

import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.structure.Graph;
Expand Down Expand Up @@ -125,26 +126,48 @@ public default boolean hasAnyOpenTransactions() {
return graph.features().graph().supportsTransactions() && graph.tx().isOpen();
});
}

/**
* This method will be called before a script or query is processed by the
* gremlin-server.
*
* @param msg the {@link RequestMessage} received by the gremlin-server.
*/
default void beforeQueryStart(final RequestMessage msg) {

}

/**
* This method will be called before a script or query is processed by the
* gremlin-server.
* <p>
* This method delegates call to the {@link #beforeQueryStart(RequestMessage)} by default, this functionality
* should be preserved by overriding methods if call of the former method is still needed.
*
* @param msg the {@link RequestMessage} received by the gremlin-server.
* @param user User authenticated in channel processing request
*/
default void beforeQueryStart(final RequestMessage msg, AuthenticatedUser user) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to account for evolving lifecycle requirements it would be more flexible for the overloaded method to accept a single query context parameter that can be adapted to contain more information as needed in the future. This way if more data is needed, it need only be added to the query context and no additional overloaded method is needed.

Something like:

 /**
   * This method will be called before a script or query is processed by the
   * gremlin-server.
   *
   * @param msg the {@link RequestMessage} received by the gremlin-server.
   * @deprecated replaced by {@link #beforeQueryStart(QueryContext)}
   */
  default void beforeQueryStart(final RequestMessage msg) {
      beforeQueryStart(new QueryContext(msg, null));
  }

  default void beforeQueryStart(final QueryContext context) {
  }
  
  public static class QueryContext {
      private RequestMessage msg;
      private AuthenticatedUser user;
      // other fields can be added in the future without breaking backwards compatibility
      
      public QueryContext(final RequestMessage msg, final AuthenticatedUser user) {
          this.msg = msg;
          this.user = user;
      }
      public RequestMessage getMessage() {
          return msg;
      }
      public AuthenticatedUser getUser() {
          return user;
      }
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will update code soon.

beforeQueryStart(msg);
}

/**
* This method is called after the commit / rollback of transaction is called at the end
* of query processing. While {@link #onQuerySuccess(RequestMessage)} and {@link #onQueryError(RequestMessage, Throwable)}
* methods are called <b>before</b> transaction will be completed.
*
* @param msg the {@link RequestMessage} received by the gremlin-server.
*/
default void afterQueryEnd(final RequestMessage msg) {
}

/**
* This method will be called if a script or query is processed by the
* gremlin-server throws an error.
*
* @param msg the {@link RequestMessage} received by the gremlin-server.
* @param error the exception encountered during processing from the gremlin-server.
*/
default void onQueryError(final RequestMessage msg, final Throwable error) {

}

/**
Expand All @@ -153,6 +176,5 @@ default void onQueryError(final RequestMessage msg, final Throwable error) {
* @param msg the {@link RequestMessage} received by the gremlin-server.
*/
default void onQuerySuccess(final RequestMessage msg) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ protected AbstractEvalOpProcessor(final boolean manageTransactions) {

/**
* Provides an operation for evaluating a Gremlin script.
*
* @return
*/
public abstract ThrowingConsumer<Context> getEvalOp();

/**
* A sub-class may have additional "ops" that it will service. Calls to {@link OpProcessor#select(Context)} that are not
* handled will be passed to this method to see if the sub-class can service the requested op code.
*
* @return
*/
public abstract Optional<ThrowingConsumer<Context>> selectOther(final Context ctx) throws OpProcessorException;
Expand Down Expand Up @@ -201,12 +203,13 @@ protected Optional<ThrowingConsumer<Context>> validateEvalMessage(final RequestM
* iteration timeouts, metrics and building bindings. Note that result iteration is delegated to the
* {@link #handleIterator(Context, Iterator)} method, so those extending this class could override that method for
* better control over result iteration.
* @param ctx The current Gremlin Server {@link Context}. This handler ensures that only a single final
* response is sent to the client.
*
* @param ctx The current Gremlin Server {@link Context}. This handler ensures that only a single final
* response is sent to the client.
* @param gremlinExecutorSupplier A function that returns the {@link GremlinExecutor} to use in executing the
* script evaluation.
* @param bindingsSupplier A function that returns the {@link Bindings} to provide to the
* {@link GremlinExecutor#eval} method.
* @param bindingsSupplier A function that returns the {@link Bindings} to provide to the
* {@link GremlinExecutor#eval} method.
*/
protected void evalOpInternal(final Context ctx, final Supplier<GremlinExecutor> gremlinExecutorSupplier,
final BindingSupplier bindingsSupplier) {
Expand All @@ -233,15 +236,23 @@ protected void evalOpInternal(final Context ctx, final Supplier<GremlinExecutor>

final GremlinExecutor.LifeCycle lifeCycle = GremlinExecutor.LifeCycle.build()
.evaluationTimeoutOverride(seto)
.afterFailure((b,t) -> {
.afterFailure((b, t) -> {
graphManager.onQueryError(msg, t);
if (managedTransactionsForRequest) attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement);
if (managedTransactionsForRequest)
attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement);
graphManager.afterQueryEnd(msg);
})
.afterTimeout((b, t) -> {
graphManager.onQueryError(msg, t);
graphManager.onQueryError(msg, t);
graphManager.afterQueryEnd(msg);
})
.beforeEval(b -> {
graphManager.beforeQueryStart(msg);
AuthenticatedUser user = ctx.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER).get();
if (null == user) {
// This is expected when using the AllowAllAuthenticator
user = AuthenticatedUser.ANONYMOUS_USER;
}
graphManager.beforeQueryStart(msg, user);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering where the best place to unit or integration test the availability of the 'user' to the lifecycle. Perhaps AbstractEvalOpProcessorTest.java (requires a lot of mocking and perhaps stubbing).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that I understood your observation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also suggest to include some form of testing for this addition, and mocking is probably the best way, though it can get rather complex. I believe Andrea is suggesting to place a set of tests inside of AbstractEvalOpProcessorTest.java?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest adding a test that checks that the processor redirected the context parameter correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was imagining the test would check that the AuthenticatedUser can be accessed by the beforeQueryStart method as part of the lifecycle.

Copy link
Author

@andrii0lomakin andrii0lomakin Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrii0lomakin Yes, but that is very high level in what scenario? In a scenario, when does it already exist in the channel as an attribute? Or should this test cover all cases of authentication? I believe that such a test case should either exist or be added as a separate change, as that is not the point of this PR to test the correctness of authentication mechanics. As that is a very resource-consuming task, it should be handled separately. I do not mind providing such testing, as security is paramount for us, but if such tests already exist in this case, I will only add checking of the parameters of this callback there. Otherwise, I will test authentication mechanics separately, as we for sure will not go in prod with authentication holes. Do you have any objections?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrii0lomakin

that is not the point of this PR to test the correctness of authentication mechanics.

I agree testing authentication mechanics are not in scope of this PR. It was not my intent to suggest that.

I will only add checking of the parameters of this callback there

Agree, that is all you should have to do for this change. My initial comment was that I could not immediately identify the best existing test to add this check to.

try {
b.putAll(bindingsSupplier.get());
} catch (OpProcessorException ope) {
Expand All @@ -268,13 +279,16 @@ protected void evalOpInternal(final Context ctx, final Supplier<GremlinExecutor>
handleIterator(ctx, itty);
graphManager.onQuerySuccess(msg);
} catch (Exception ex) {
if (managedTransactionsForRequest) attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement);

graphManager.onQueryError(msg, ex);
if (managedTransactionsForRequest)
attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement);
CloseableIterator.closeIterator(itty);

// wrap up the exception and rethrow. the error will be written to the client by the evalFuture
// as it will completeExceptionally in the GremlinExecutor
throw new RuntimeException(ex);
} finally {
graphManager.afterQueryEnd(msg);
}
}).create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,17 @@ private void iterateBytecodeTraversal(final Context context) throws Exception {
context.writeAndFlush(specialResponseMsg.create());
} else if (t instanceof InterruptedException || t instanceof TraversalInterruptedException) {
graphManager.onQueryError(msg, t);
graphManager.afterQueryEnd(msg);
final String errorMessage = String.format("A timeout occurred during traversal evaluation of [%s] - consider increasing the limit given to evaluationTimeout", msg);
logger.warn(errorMessage);
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
.statusMessage(errorMessage)
.statusAttributeException(ex).create());
.statusMessage(errorMessage)
.statusAttributeException(ex).create());
} else {
logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex);
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(ex.getMessage())
.statusAttributeException(ex).create());
.statusMessage(ex.getMessage())
.statusAttributeException(ex).create());
}
onError(graph, context, ex);
}
Expand Down Expand Up @@ -322,24 +323,32 @@ private void iterateBytecodeTraversal(final Context context) throws Exception {
}

protected void beforeProcessing(final Graph graph, final Context ctx) {
final GraphManager graphManager = ctx.getGraphManager();
final RequestMessage msg = ctx.getRequestMessage();
graphManager.beforeQueryStart(msg);
final GraphManager graphManager = ctx.getGraphManager();
final RequestMessage msg = ctx.getRequestMessage();

AuthenticatedUser user = ctx.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER).get();
if (null == user) { // This is expected when using the AllowAllAuthenticator
user = AuthenticatedUser.ANONYMOUS_USER;
}
graphManager.beforeQueryStart(msg, user);

if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
}

protected void onError(final Graph graph, final Context ctx, Throwable error) {
final GraphManager graphManager = ctx.getGraphManager();
final RequestMessage msg = ctx.getRequestMessage();
graphManager.onQueryError(msg, error);
graphManager.onQueryError(msg, error);
if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
graphManager.afterQueryEnd(msg);
}

protected void onTraversalSuccess(final Graph graph, final Context ctx) {
final GraphManager graphManager = ctx.getGraphManager();
final RequestMessage msg = ctx.getRequestMessage();
graphManager.onQuerySuccess(msg);
if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().commit();
graphManager.afterQueryEnd(msg);
}

protected void handleIterator(final Context context, final Iterator itty, final Graph graph) throws InterruptedException {
Expand Down Expand Up @@ -398,7 +407,8 @@ protected void handleIterator(final Context context, final Iterator itty, final
// this could be placed inside the isWriteable() portion of the if-then below but it seems better to
// allow iteration to continue into a batch if that is possible rather than just doing nothing at all
// while waiting for the client to catch up
if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next());
if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush)
aggregate.add(itty.next());

// Don't keep executor busy if client has already given up; there is no way to catch up if the channel is
// not active, and hence we should break the loop.
Expand Down Expand Up @@ -427,7 +437,7 @@ protected void handleIterator(final Context context, final Iterator itty, final
Frame frame = null;
try {
frame = makeFrame(context, msg, serializer, useBinary, aggregate, code,
metadata, statusAttrb);
metadata, statusAttrb);
} catch (Exception ex) {
// a frame may use a Bytebuf which is a countable release - if it does not get written
// downstream it needs to be released here
Expand Down