Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport CASSANDRA-10392 - Custom Tracing Implementations #600

Open
wants to merge 6 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/java/org/apache/cassandra/concurrent/StageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,15 @@ public static void shutdownNow()
}
}

public final static Runnable NO_OP_TASK = new Runnable()
{
public void run()
{

}
};

@VisibleForTesting
// TODO(tpetracca): can this be deleted?
public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
ExecutorUtils.shutdownNowAndWait(timeout, unit, StageManager.stages.values());
}

public final static Runnable NO_OP_TASK = () -> {};

/**
* A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
* tracing stage. See CASSANDRA-1123 for background. We allow submitting NO_OP tasks, to allow
Expand Down
7 changes: 1 addition & 6 deletions src/java/org/apache/cassandra/net/MessageOut.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;

import static org.apache.cassandra.tracing.Tracing.TRACE_HEADER;
import static org.apache.cassandra.tracing.Tracing.TRACE_TYPE;
import static org.apache.cassandra.tracing.Tracing.isTracing;

public class MessageOut<T>
Expand All @@ -61,8 +57,7 @@ public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T>
payload,
serializer,
isTracing()
? ImmutableMap.of(TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId()),
TRACE_TYPE, new byte[] { Tracing.TraceType.serialize(Tracing.instance.getTraceType()) })
? Tracing.instance.getTraceHeaders()
: Collections.<String, byte[]>emptyMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private void writeConnected(QueuedMessage qm, boolean flush)
{
byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE);
Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL());
Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL());
}
else
{
Expand Down
12 changes: 10 additions & 2 deletions src/java/org/apache/cassandra/service/QueryState.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.cassandra.service;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -75,15 +78,20 @@ public void prepareTracingSession(UUID sessionId)
}

public void createTracingSession()
{
createTracingSession(Collections.EMPTY_MAP);
}

public void createTracingSession(Map<String,ByteBuffer> customPayload)
{
UUID session = this.preparedTracingSession;
if (session == null)
{
Tracing.instance.newSession();
Tracing.instance.newSession(customPayload);
}
else
{
Tracing.instance.newSession(session);
Tracing.instance.newSession(session, customPayload);
this.preparedTracingSession = null;
}
}
Expand Down
29 changes: 20 additions & 9 deletions src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,50 @@
/*
*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*
*/

package org.apache.cassandra.tracing;

import java.util.UUID;

import org.apache.cassandra.utils.FBUtilities;

public class ExpiredTraceState extends TraceState
class ExpiredTraceState extends TraceState
{
public ExpiredTraceState(UUID sessionId, Tracing.TraceType traceType)
private final TraceState delegate;

ExpiredTraceState(TraceState delegate)
{
super(FBUtilities.getBroadcastAddress(), sessionId, traceType);
super(FBUtilities.getBroadcastAddress(), delegate.sessionId, delegate.traceType);
this.delegate = delegate;
}

public int elapsed()
{
return -1;
}

protected void traceImpl(String message)
{
delegate.traceImpl(message);
}

protected void waitForPendingEvents()
{
delegate.waitForPendingEvents();
}
}
97 changes: 7 additions & 90 deletions src/java/org/apache/cassandra/tracing/TraceState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,16 @@

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventNotifier;
import org.apache.cassandra.utils.progress.ProgressListener;
Expand All @@ -48,12 +37,8 @@
* ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an
* operation is being traced.
*/
public class TraceState implements ProgressEventNotifier
public abstract class TraceState implements ProgressEventNotifier
{
private static final Logger logger = LoggerFactory.getLogger(TraceState.class);
private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "0"));

public final UUID sessionId;
public final InetAddress coordinator;
public final Stopwatch watch;
Expand All @@ -78,7 +63,7 @@ public enum Status
// See CASSANDRA-7626 for more details.
private final AtomicInteger references = new AtomicInteger(1);

public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType)
protected TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType)
{
assert coordinator != null;
assert sessionId != null;
Expand All @@ -90,7 +75,7 @@ public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType tra
this.ttl = traceType.getTTL();
watch = Stopwatch.createStarted();
this.status = Status.IDLE;
}
}

/**
* Activate notification with provided {@code tag} name.
Expand Down Expand Up @@ -160,7 +145,7 @@ public synchronized Status waitActivity(long timeout)
return status;
}

private synchronized void notifyActivity()
protected synchronized void notifyActivity()
{
status = Status.ACTIVE;
notifyAll();
Expand All @@ -186,85 +171,17 @@ public void trace(String message)
if (notify)
notifyActivity();

final String threadName = Thread.currentThread().getName();
final int elapsed = elapsed();

executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl));
if (logger.isTraceEnabled())
logger.trace("Adding <{}> to trace events", message);
traceImpl(message);

for (ProgressListener listener : listeners)
{
listener.progress(tag, ProgressEvent.createNotification(message));
}
}

static void executeMutation(final Mutation mutation)
{
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
mutateWithCatch(mutation);
}
});
}

/**
* Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for non-local traces (traces
* that are not initiated by local node == coordinator).
*/
public static void mutateWithTracing(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl)
{
final String threadName = Thread.currentThread().getName();

StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
public void runMayThrow()
{
mutateWithCatch(TraceKeyspace.makeEventMutation(sessionId, message, elapsed, threadName, ttl));
}
});
}

static void mutateWithCatch(Mutation mutation)
{
try
{
StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY);
}
catch (OverloadedException e)
{
Tracing.logger.warn("Too many nodes are overloaded to save trace events");
}
}

/**
* Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations
* have at least been applied to one replica. This works because the tracking executor only
* has one thread in its pool, see {@link StageManager#tracingExecutor()}.
*/
protected void waitForPendingEvents()
{
if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
return;

try
{
if (logger.isTraceEnabled())
logger.trace("Waiting for up to {} seconds for trace events to complete",
+WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);

StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
.get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
logger.debug("Failed to wait for tracing events to complete: {}", t);
}
}
protected abstract void traceImpl(String message);

protected abstract void waitForPendingEvents();

public boolean acquireReference()
{
Expand Down
Loading