diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 26b76ab815..0f53cb9481 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -121,20 +121,15 @@ public static void shutdownNow() } } - public final static Runnable NO_OP_TASK = new Runnable() - { - public void run() - { - - } - }; - @VisibleForTesting + // TODO(tpetracca): can this be deleted? public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { ExecutorUtils.shutdownNowAndWait(timeout, unit, StageManager.stages.values()); } + public final static Runnable NO_OP_TASK = () -> {}; + /** * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the * tracing stage. See CASSANDRA-1123 for background. We allow submitting NO_OP tasks, to allow diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 1e291c2afe..49bc29bc45 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -33,10 +33,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.UUIDGen; - -import static org.apache.cassandra.tracing.Tracing.TRACE_HEADER; -import static org.apache.cassandra.tracing.Tracing.TRACE_TYPE; import static org.apache.cassandra.tracing.Tracing.isTracing; public class MessageOut @@ -61,8 +57,7 @@ public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer payload, serializer, isTracing() - ? ImmutableMap.of(TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId()), - TRACE_TYPE, new byte[] { Tracing.TraceType.serialize(Tracing.instance.getTraceType()) }) + ? Tracing.instance.getTraceHeaders() : Collections.emptyMap()); } diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index b79d0c5035..925f16ee43 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -314,7 +314,7 @@ private void writeConnected(QueuedMessage qm, boolean flush) { byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE); Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL()); + Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL()); } else { diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index ddbc9597c4..c70c6927cb 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -18,6 +18,9 @@ package org.apache.cassandra.service; import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -75,15 +78,20 @@ public void prepareTracingSession(UUID sessionId) } public void createTracingSession() + { + createTracingSession(Collections.EMPTY_MAP); + } + + public void createTracingSession(Map customPayload) { UUID session = this.preparedTracingSession; if (session == null) { - Tracing.instance.newSession(); + Tracing.instance.newSession(customPayload); } else { - Tracing.instance.newSession(session); + Tracing.instance.newSession(session, customPayload); this.preparedTracingSession = null; } } diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java index 5cc3c21107..bc8d5dd808 100644 --- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java +++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,33 +7,44 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ package org.apache.cassandra.tracing; -import java.util.UUID; - import org.apache.cassandra.utils.FBUtilities; -public class ExpiredTraceState extends TraceState +class ExpiredTraceState extends TraceState { - public ExpiredTraceState(UUID sessionId, Tracing.TraceType traceType) + private final TraceState delegate; + + ExpiredTraceState(TraceState delegate) { - super(FBUtilities.getBroadcastAddress(), sessionId, traceType); + super(FBUtilities.getBroadcastAddress(), delegate.sessionId, delegate.traceType); + this.delegate = delegate; } public int elapsed() { return -1; } + + protected void traceImpl(String message) + { + delegate.traceImpl(message); + } + + protected void waitForPendingEvents() + { + delegate.waitForPendingEvents(); + } } diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index 88f36d8f91..ec2bc9e26b 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -19,7 +19,6 @@ import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; @@ -27,19 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Stopwatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.exceptions.OverloadedException; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventNotifier; import org.apache.cassandra.utils.progress.ProgressListener; @@ -48,12 +37,8 @@ * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an * operation is being traced. */ -public class TraceState implements ProgressEventNotifier +public abstract class TraceState implements ProgressEventNotifier { - private static final Logger logger = LoggerFactory.getLogger(TraceState.class); - private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = - Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "0")); - public final UUID sessionId; public final InetAddress coordinator; public final Stopwatch watch; @@ -78,7 +63,7 @@ public enum Status // See CASSANDRA-7626 for more details. private final AtomicInteger references = new AtomicInteger(1); - public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) + protected TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) { assert coordinator != null; assert sessionId != null; @@ -90,7 +75,7 @@ public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType tra this.ttl = traceType.getTTL(); watch = Stopwatch.createStarted(); this.status = Status.IDLE; -} + } /** * Activate notification with provided {@code tag} name. @@ -160,7 +145,7 @@ public synchronized Status waitActivity(long timeout) return status; } - private synchronized void notifyActivity() + protected synchronized void notifyActivity() { status = Status.ACTIVE; notifyAll(); @@ -186,12 +171,7 @@ public void trace(String message) if (notify) notifyActivity(); - final String threadName = Thread.currentThread().getName(); - final int elapsed = elapsed(); - - executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl)); - if (logger.isTraceEnabled()) - logger.trace("Adding <{}> to trace events", message); + traceImpl(message); for (ProgressListener listener : listeners) { @@ -199,72 +179,9 @@ public void trace(String message) } } - static void executeMutation(final Mutation mutation) - { - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() - { - protected void runMayThrow() throws Exception - { - mutateWithCatch(mutation); - } - }); - } - - /** - * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for non-local traces (traces - * that are not initiated by local node == coordinator). - */ - public static void mutateWithTracing(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl) - { - final String threadName = Thread.currentThread().getName(); - - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() - { - public void runMayThrow() - { - mutateWithCatch(TraceKeyspace.makeEventMutation(sessionId, message, elapsed, threadName, ttl)); - } - }); - } - - static void mutateWithCatch(Mutation mutation) - { - try - { - StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY); - } - catch (OverloadedException e) - { - Tracing.logger.warn("Too many nodes are overloaded to save trace events"); - } - } - - /** - * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations - * have at least been applied to one replica. This works because the tracking executor only - * has one thread in its pool, see {@link StageManager#tracingExecutor()}. - */ - protected void waitForPendingEvents() - { - if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0) - return; - - try - { - if (logger.isTraceEnabled()) - logger.trace("Waiting for up to {} seconds for trace events to complete", - +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); - - StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK) - .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - logger.debug("Failed to wait for tracing events to complete: {}", t); - } - } + protected abstract void traceImpl(String message); + protected abstract void waitForPendingEvents(); public boolean acquireReference() { diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java new file mode 100644 index 0000000000..6e9f300da6 --- /dev/null +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tracing; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.WrappedRunnable; + +/** + * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an + * operation is being traced. + */ +public class TraceStateImpl extends TraceState +{ + private static final Logger logger = LoggerFactory.getLogger(TraceStateImpl.class); + + private static int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = + Integer.parseInt(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "0")); + + public TraceStateImpl(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) + { + super(coordinator, sessionId, traceType); + } + + protected void traceImpl(String message) + { + final String threadName = Thread.currentThread().getName(); + final int elapsed = elapsed(); + + executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl)); + if (logger.isTraceEnabled()) + logger.trace("Adding <{}> to trace events", message); + } + + /** + * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations + * have at least been applied to one replica. This works because the tracking executor only + * has one thread in its pool, see {@link StageManager#tracingExecutor()}. + */ + protected void waitForPendingEvents() + { + if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0) + return; + + try + { + if (logger.isTraceEnabled()) + logger.trace("Waiting for up to {} seconds for trace events to complete", + WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); + + StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK) + .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.debug("Failed to wait for tracing events to complete: {}", t); + } + } + + static void executeMutation(final Mutation mutation) + { + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + { + protected void runMayThrow() throws Exception + { + mutateWithCatch(mutation); + } + }); + } + + static void mutateWithCatch(Mutation mutation) + { + try + { + StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY); + } + catch (OverloadedException e) + { + Tracing.logger.warn("Too many nodes are overloaded to save trace events"); + } + } + +} diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index bf9cee763c..e69645f715 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -21,11 +21,13 @@ import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,14 +37,15 @@ import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.UUIDGen; /** * A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may - * have multiple local and remote events before it is completed. All events and sessions are stored at keyspace. + * have multiple local and remote events before it is completed. */ -public class Tracing implements ExecutorLocal +public abstract class Tracing implements ExecutorLocal { public static final String TRACE_HEADER = "TraceSession"; public static final String TRACE_TYPE = "TraceType"; @@ -77,15 +80,34 @@ public int getTTL() } } - static final Logger logger = LoggerFactory.getLogger(Tracing.class); + protected static final Logger logger = LoggerFactory.getLogger(Tracing.class); private final InetAddress localAddress = FBUtilities.getLocalAddress(); private final ThreadLocal state = new ThreadLocal<>(); - private final ConcurrentMap sessions = new ConcurrentHashMap<>(); + protected final ConcurrentMap sessions = new ConcurrentHashMap<>(); - public static final Tracing instance = new Tracing(); + public static final Tracing instance; + + static { + Tracing tracing = null; + String customTracingClass = System.getProperty("cassandra.custom_tracing_class"); + if (null != customTracingClass) + { + try + { + tracing = FBUtilities.construct(customTracingClass, "Tracing"); + logger.info("Using {} as tracing queries (as requested with -Dcassandra.custom_tracing_class)", customTracingClass); + } + catch (Exception e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.error("Cannot use class {} for tracing ({}), ignoring by defaulting on normal tracing", customTracingClass, e.getMessage()); + } + } + instance = null != tracing ? tracing : new TracingImpl(); + } public UUID getSessionId() { @@ -110,30 +132,33 @@ public int getTTL() */ public static boolean isTracing() { - return instance.state.get() != null; + return instance.get() != null; } - public UUID newSession() + public UUID newSession(Map customPayload) { return newSession(TraceType.QUERY); } public UUID newSession(TraceType traceType) { - return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())), traceType); + return newSession( + TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())), + traceType, + Collections.EMPTY_MAP); } - public UUID newSession(UUID sessionId) + public UUID newSession(UUID sessionId, Map customPayload) { - return newSession(sessionId, TraceType.QUERY); + return newSession(sessionId, TraceType.QUERY, Collections.EMPTY_MAP); } - private UUID newSession(UUID sessionId, TraceType traceType) + protected UUID newSession(UUID sessionId, TraceType traceType, Map customPayload) { - assert state.get() == null; + assert get() == null; - TraceState ts = new TraceState(localAddress, sessionId, traceType); - state.set(ts); + TraceState ts = newTraceState(localAddress, sessionId, traceType); + set(ts); sessions.put(sessionId, ts); return sessionId; @@ -145,30 +170,29 @@ public void doneWithNonLocalSession(TraceState state) sessions.remove(state.sessionId); } + /** * Stop the session and record its complete. Called by coodinator when request is complete. */ public void stopSession() { - TraceState state = this.state.get(); + TraceState state = get(); if (state == null) // inline isTracing to avoid implicit two calls to state.get() { logger.trace("request complete"); } else { - final int elapsed = state.elapsed(); - final ByteBuffer sessionId = state.sessionIdBytes; - final int ttl = state.ttl; - - TraceState.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed, ttl)); + stopSessionImpl(); state.stop(); sessions.remove(state.sessionId); - this.state.set(null); + set(null); } } + protected abstract void stopSessionImpl(); + public TraceState get() { return state.get(); @@ -189,24 +213,11 @@ public TraceState begin(final String request, final Map paramete return begin(request, null, parameters); } - public TraceState begin(final String request, final InetAddress client, final Map parameters) - { - assert isTracing(); - - final TraceState state = this.state.get(); - final long startedAt = System.currentTimeMillis(); - final ByteBuffer sessionId = state.sessionIdBytes; - final String command = state.traceType.toString(); - final int ttl = state.ttl; - - TraceState.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters, request, startedAt, command, ttl)); - - return state; - } + public abstract TraceState begin(String request, InetAddress client, Map parameters); /** * Determines the tracing context from a message. Does NOT set the threadlocal state. - * + * * @param message The internode message */ public TraceState initializeFromMessage(final MessageIn message) @@ -218,7 +229,7 @@ public TraceState initializeFromMessage(final MessageIn message) assert sessionBytes.length == 16; UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); - TraceState ts = sessions.get(sessionId); + TraceState ts = get(sessionId); if (ts != null && ts.acquireReference()) return ts; @@ -230,16 +241,26 @@ public TraceState initializeFromMessage(final MessageIn message) if (message.verb == MessagingService.Verb.REQUEST_RESPONSE) { // received a message for a session we've already closed out. see CASSANDRA-5668 - return new ExpiredTraceState(sessionId, traceType); + return new ExpiredTraceState(newTraceState(message.from, sessionId, traceType)); } else { - ts = new TraceState(message.from, sessionId, traceType); + ts = newTraceState(message.from, sessionId, traceType); sessions.put(sessionId, ts); return ts; } } + public Map getTraceHeaders() + { + assert isTracing(); + + return ImmutableMap.of( + TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId()), + TRACE_TYPE, new byte[] { Tracing.TraceType.serialize(Tracing.instance.getTraceType()) }); + } + + protected abstract TraceState newTraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType); // repair just gets a varargs method since it's so heavyweight anyway public static void traceRepair(String format, Object... args) @@ -287,4 +308,10 @@ public static void trace(String format, Object... args) state.trace(format, args); } + + /** + * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for non-local traces (traces + * that are not initiated by local node == coordinator). + */ + public abstract void trace(ByteBuffer sessionId, String message, int ttl); } diff --git a/src/java/org/apache/cassandra/tracing/TracingImpl.java b/src/java/org/apache/cassandra/tracing/TracingImpl.java new file mode 100644 index 0000000000..52ac183cf6 --- /dev/null +++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.tracing; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.utils.WrappedRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +/** + * A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may + * have multiple local and remote events before it is completed. All events and sessions are stored at keyspace. + */ +class TracingImpl extends Tracing +{ + private static final Logger logger = LoggerFactory.getLogger(TracingImpl.class); + + public void stopSessionImpl() { + TraceState state = get(); + int elapsed = state.elapsed(); + ByteBuffer sessionId = state.sessionIdBytes; + int ttl = state.ttl; + TraceStateImpl.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed, ttl)); + } + + public TraceState begin(final String request, final InetAddress client, final Map parameters) + { + assert isTracing(); + + final TraceState state = get(); + final long startedAt = System.currentTimeMillis(); + final ByteBuffer sessionId = state.sessionIdBytes; + final String command = state.traceType.toString(); + final int ttl = state.ttl; + + TraceStateImpl.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters, request, startedAt, command, ttl)); + + return state; + } + + @Override + protected TraceState newTraceState(InetAddress coordinator, UUID sessionId, TraceType traceType) + { + return new TraceStateImpl(coordinator, sessionId, traceType); + } + + /** + * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for non-local traces (traces + * that are not initiated by local node == coordinator). + */ + public void trace(final ByteBuffer sessionId, final String message, final int ttl) + { + final String threadName = Thread.currentThread().getName(); + + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + { + public void runMayThrow() + { + TraceStateImpl.mutateWithCatch(TraceKeyspace.makeEventMutation(sessionId, message, -1, threadName, ttl)); + } + }); + } +} diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 11a227c33e..287d777891 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -122,7 +122,7 @@ public Message.Response execute(QueryState state) if (state.traceNextQuery()) { - state.createTracingSession(); + state.createTracingSession(getCustomPayload()); ImmutableMap.Builder builder = ImmutableMap.builder(); if (options.getPageSize() > 0) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index 6553fb94de..01354a3f20 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@ -68,7 +68,7 @@ public Future asyncExecuteWithTracingWithResult(UUID sessionI return instance.async(() -> { try { - Tracing.instance.newSession(sessionId); + Tracing.instance.newSession(sessionId, Collections.EMPTY_MAP); return executeInternal(query, consistencyLevelOrigin, boundValues); } finally diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index aca1cfd479..697336d7a0 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -369,7 +369,7 @@ public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress t { byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE); Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, traceType.getTTL()); + Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), traceMessage, traceType.getTTL()); } else { diff --git a/test/unit/org/apache/cassandra/tracing/TracingTest.java b/test/unit/org/apache/cassandra/tracing/TracingTest.java new file mode 100644 index 0000000000..30521c0bf0 --- /dev/null +++ b/test/unit/org/apache/cassandra/tracing/TracingTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tracing; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.junit.Test; + +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressListener; + +public final class TracingTest +{ + + @Test + public void test() + { + List traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.NONE); + TraceState state = tracing.begin("test-request", Collections.emptyMap()); + state.trace("test-1"); + state.trace("test-2"); + state.trace("test-3"); + tracing.stopSession(); + + assert null == tracing.get(); + assert 4 == traces.size(); + assert "test-request".equals(traces.get(0)); + assert "test-1".equals(traces.get(1)); + assert "test-2".equals(traces.get(2)); + assert "test-3".equals(traces.get(3)); + } + + @Test + public void test_get() + { + List traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.NONE); + tracing.begin("test-request", Collections.emptyMap()); + tracing.get().trace("test-1"); + tracing.get().trace("test-2"); + tracing.get().trace("test-3"); + tracing.stopSession(); + + assert null == tracing.get(); + assert 4 == traces.size(); + assert "test-request".equals(traces.get(0)); + assert "test-1".equals(traces.get(1)); + assert "test-2".equals(traces.get(2)); + assert "test-3".equals(traces.get(3)); + } + + @Test + public void test_get_uuid() + { + List traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + UUID uuid = tracing.newSession(Tracing.TraceType.NONE); + tracing.begin("test-request", Collections.emptyMap()); + tracing.get(uuid).trace("test-1"); + tracing.get(uuid).trace("test-2"); + tracing.get(uuid).trace("test-3"); + tracing.stopSession(); + + assert null == tracing.get(); + assert 4 == traces.size(); + assert "test-request".equals(traces.get(0)); + assert "test-1".equals(traces.get(1)); + assert "test-2".equals(traces.get(2)); + assert "test-3".equals(traces.get(3)); + } + + @Test + public void test_states() + { + List traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.REPAIR); + tracing.begin("test-request", Collections.emptyMap()); + tracing.get().enableActivityNotification("test-tag"); + assert TraceState.Status.IDLE == tracing.get().waitActivity(1); + tracing.get().trace("test-1"); + assert TraceState.Status.ACTIVE == tracing.get().waitActivity(1); + tracing.get().stop(); + assert TraceState.Status.STOPPED == tracing.get().waitActivity(1); + tracing.stopSession(); + assert null == tracing.get(); + } + + @Test + public void test_progress_listener() + { + List traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.REPAIR); + tracing.begin("test-request", Collections.emptyMap()); + tracing.get().enableActivityNotification("test-tag"); + + tracing.get().addProgressListener( + new ProgressListener() + { + public void progress(String tag, ProgressEvent pe) + { + assert "test-tag".equals(tag); + assert "test-trace".equals(pe.getMessage()); + } + }); + + tracing.get().trace("test-trace"); + tracing.stopSession(); + assert null == tracing.get(); + } + + private class TracingImpl extends Tracing + { + private final List traces; + + public TracingImpl(List traces) + { + this.traces = traces; + } + + public void stopSessionImpl() + {} + + public TraceState begin(String request, InetAddress ia, Map map) + { + traces.add(request); + return get(); + } + + protected TraceState newTraceState(InetAddress ia, UUID uuid, Tracing.TraceType tt) + { + return new TraceState(ia, uuid, tt) + { + protected void traceImpl(String string) + { + traces.add(string); + } + + protected void waitForPendingEvents() + { + } + }; + } + + public void trace(ByteBuffer bb, String string, int i) + { + throw new UnsupportedOperationException("Not supported yet."); + } + } +}