diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GraphManager.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GraphManager.java index bbd9ab51b5a..6619c92081a 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GraphManager.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GraphManager.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.server; +import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -125,7 +126,7 @@ public default boolean hasAnyOpenTransactions() { return graph.features().graph().supportsTransactions() && graph.tx().isOpen(); }); } - + /** * This method will be called before a script or query is processed by the * gremlin-server. @@ -133,18 +134,40 @@ public default boolean hasAnyOpenTransactions() { * @param msg the {@link RequestMessage} received by the gremlin-server. */ default void beforeQueryStart(final RequestMessage msg) { - } /** * This method will be called before a script or query is processed by the * gremlin-server. + *

+ * This method delegates call to the {@link #beforeQueryStart(RequestMessage)} by default, this functionality + * should be preserved by overriding methods if call of the former method is still needed. + * + * @param msg the {@link RequestMessage} received by the gremlin-server. + * @param user User authenticated in channel processing request + */ + default void beforeQueryStart(final RequestMessage msg, AuthenticatedUser user) { + beforeQueryStart(msg); + } + + /** + * This method is called after the commit / rollback of transaction is called at the end + * of query processing. While {@link #onQuerySuccess(RequestMessage)} and {@link #onQueryError(RequestMessage, Throwable)} + * methods are called before transaction will be completed. * * @param msg the {@link RequestMessage} received by the gremlin-server. + */ + default void afterQueryEnd(final RequestMessage msg) { + } + + /** + * This method will be called if a script or query is processed by the + * gremlin-server throws an error. + * + * @param msg the {@link RequestMessage} received by the gremlin-server. * @param error the exception encountered during processing from the gremlin-server. */ default void onQueryError(final RequestMessage msg, final Throwable error) { - } /** @@ -153,6 +176,5 @@ default void onQueryError(final RequestMessage msg, final Throwable error) { * @param msg the {@link RequestMessage} received by the gremlin-server. */ default void onQuerySuccess(final RequestMessage msg) { - } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java index d89525638f1..37c5608254d 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java @@ -131,6 +131,7 @@ protected AbstractEvalOpProcessor(final boolean manageTransactions) { /** * Provides an operation for evaluating a Gremlin script. + * * @return */ public abstract ThrowingConsumer getEvalOp(); @@ -138,6 +139,7 @@ protected AbstractEvalOpProcessor(final boolean manageTransactions) { /** * A sub-class may have additional "ops" that it will service. Calls to {@link OpProcessor#select(Context)} that are not * handled will be passed to this method to see if the sub-class can service the requested op code. + * * @return */ public abstract Optional> selectOther(final Context ctx) throws OpProcessorException; @@ -201,12 +203,13 @@ protected Optional> validateEvalMessage(final RequestM * iteration timeouts, metrics and building bindings. Note that result iteration is delegated to the * {@link #handleIterator(Context, Iterator)} method, so those extending this class could override that method for * better control over result iteration. - * @param ctx The current Gremlin Server {@link Context}. This handler ensures that only a single final - * response is sent to the client. + * + * @param ctx The current Gremlin Server {@link Context}. This handler ensures that only a single final + * response is sent to the client. * @param gremlinExecutorSupplier A function that returns the {@link GremlinExecutor} to use in executing the * script evaluation. - * @param bindingsSupplier A function that returns the {@link Bindings} to provide to the - * {@link GremlinExecutor#eval} method. + * @param bindingsSupplier A function that returns the {@link Bindings} to provide to the + * {@link GremlinExecutor#eval} method. */ protected void evalOpInternal(final Context ctx, final Supplier gremlinExecutorSupplier, final BindingSupplier bindingsSupplier) { @@ -233,15 +236,23 @@ protected void evalOpInternal(final Context ctx, final Supplier final GremlinExecutor.LifeCycle lifeCycle = GremlinExecutor.LifeCycle.build() .evaluationTimeoutOverride(seto) - .afterFailure((b,t) -> { + .afterFailure((b, t) -> { graphManager.onQueryError(msg, t); - if (managedTransactionsForRequest) attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement); + if (managedTransactionsForRequest) + attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement); + graphManager.afterQueryEnd(msg); }) .afterTimeout((b, t) -> { - graphManager.onQueryError(msg, t); + graphManager.onQueryError(msg, t); + graphManager.afterQueryEnd(msg); }) .beforeEval(b -> { - graphManager.beforeQueryStart(msg); + AuthenticatedUser user = ctx.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER).get(); + if (null == user) { + // This is expected when using the AllowAllAuthenticator + user = AuthenticatedUser.ANONYMOUS_USER; + } + graphManager.beforeQueryStart(msg, user); try { b.putAll(bindingsSupplier.get()); } catch (OpProcessorException ope) { @@ -268,13 +279,16 @@ protected void evalOpInternal(final Context ctx, final Supplier handleIterator(ctx, itty); graphManager.onQuerySuccess(msg); } catch (Exception ex) { - if (managedTransactionsForRequest) attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement); - + graphManager.onQueryError(msg, ex); + if (managedTransactionsForRequest) + attemptRollback(msg, ctx.getGraphManager(), settings.strictTransactionManagement); CloseableIterator.closeIterator(itty); // wrap up the exception and rethrow. the error will be written to the client by the evalFuture // as it will completeExceptionally in the GremlinExecutor throw new RuntimeException(ex); + } finally { + graphManager.afterQueryEnd(msg); } }).create(); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java index 7b2630a37a8..78e69d71e4f 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java @@ -248,16 +248,17 @@ private void iterateBytecodeTraversal(final Context context) throws Exception { context.writeAndFlush(specialResponseMsg.create()); } else if (t instanceof InterruptedException || t instanceof TraversalInterruptedException) { graphManager.onQueryError(msg, t); + graphManager.afterQueryEnd(msg); final String errorMessage = String.format("A timeout occurred during traversal evaluation of [%s] - consider increasing the limit given to evaluationTimeout", msg); logger.warn(errorMessage); context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) - .statusMessage(errorMessage) - .statusAttributeException(ex).create()); + .statusMessage(errorMessage) + .statusAttributeException(ex).create()); } else { logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex); context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR) - .statusMessage(ex.getMessage()) - .statusAttributeException(ex).create()); + .statusMessage(ex.getMessage()) + .statusAttributeException(ex).create()); } onError(graph, context, ex); } @@ -322,17 +323,24 @@ private void iterateBytecodeTraversal(final Context context) throws Exception { } protected void beforeProcessing(final Graph graph, final Context ctx) { - final GraphManager graphManager = ctx.getGraphManager(); - final RequestMessage msg = ctx.getRequestMessage(); - graphManager.beforeQueryStart(msg); + final GraphManager graphManager = ctx.getGraphManager(); + final RequestMessage msg = ctx.getRequestMessage(); + + AuthenticatedUser user = ctx.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER).get(); + if (null == user) { // This is expected when using the AllowAllAuthenticator + user = AuthenticatedUser.ANONYMOUS_USER; + } + graphManager.beforeQueryStart(msg, user); + if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback(); } protected void onError(final Graph graph, final Context ctx, Throwable error) { final GraphManager graphManager = ctx.getGraphManager(); final RequestMessage msg = ctx.getRequestMessage(); - graphManager.onQueryError(msg, error); + graphManager.onQueryError(msg, error); if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback(); + graphManager.afterQueryEnd(msg); } protected void onTraversalSuccess(final Graph graph, final Context ctx) { @@ -340,6 +348,7 @@ protected void onTraversalSuccess(final Graph graph, final Context ctx) { final RequestMessage msg = ctx.getRequestMessage(); graphManager.onQuerySuccess(msg); if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().commit(); + graphManager.afterQueryEnd(msg); } protected void handleIterator(final Context context, final Iterator itty, final Graph graph) throws InterruptedException { @@ -398,7 +407,8 @@ protected void handleIterator(final Context context, final Iterator itty, final // this could be placed inside the isWriteable() portion of the if-then below but it seems better to // allow iteration to continue into a batch if that is possible rather than just doing nothing at all // while waiting for the client to catch up - if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next()); + if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) + aggregate.add(itty.next()); // Don't keep executor busy if client has already given up; there is no way to catch up if the channel is // not active, and hence we should break the loop. @@ -427,7 +437,7 @@ protected void handleIterator(final Context context, final Iterator itty, final Frame frame = null; try { frame = makeFrame(context, msg, serializer, useBinary, aggregate, code, - metadata, statusAttrb); + metadata, statusAttrb); } catch (Exception ex) { // a frame may use a Bytebuf which is a countable release - if it does not get written // downstream it needs to be released here