Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate to SafeLogging in org.apache.cassandra.streaming #592

Open
wants to merge 3 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/java/org/apache/cassandra/streaming/ConnectionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
Expand Down Expand Up @@ -80,11 +82,13 @@ public class ConnectionHandler
@SuppressWarnings("resource")
public void initiate() throws Exception
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Sending stream init for incoming stream", SafeArg.of("planId", session.planId()));
Socket incomingSocket = session.createConnection();
incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);

logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Sending stream init for outgoing stream", SafeArg.of("planId", session.planId()));
Socket outgoingSocket = session.createConnection();
outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
}
Expand All @@ -106,7 +110,10 @@ public void initiateOnReceivingSide(IncomingStreamingConnection connection, bool

public ListenableFuture<?> close()
{
logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer);
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Closing stream connection handler on {}",
SafeArg.of("planId", session.planId()),
SafeArg.of("peer", session.peer));

ListenableFuture<?> inClosed = closeIncoming();
ListenableFuture<?> outClosed = closeOutgoing();
Expand Down Expand Up @@ -290,7 +297,10 @@ public void run()
{
// receive message
StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
logger.debug("[Stream #{}] Received {}", session.planId(), message);
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Received {}",
SafeArg.of("planId", session.planId()),
SafeArg.of("message", message));
// Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok
// to ignore here since we'll have asked for a retry.
if (message != null)
Expand Down Expand Up @@ -357,7 +367,10 @@ public void run()
{
if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null)
{
logger.debug("[Stream #{}] Sending {}", session.planId(), next);
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Sending {}",
SafeArg.of("planId", session.planId()),
SafeArg.of("message", next));
sendMessage(out, next);
if (next.type == StreamMessage.Type.SESSION_FAILED)
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.OutboundTcpConnectionPool;

Expand Down Expand Up @@ -60,7 +61,10 @@ public Socket createConnection(InetAddress peer) throws Exception
throw e;

long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
logger.warn("Failed attempt {} to connect to {}. Retrying in {} ms. ({})", attempts, peer, waitms, e);
logger.warn("Failed attempt {} to connect to {}. Retrying in {} ms.",
SafeArg.of("attempts", attempts),
SafeArg.of("peer", peer),
SafeArg.of("waitms", waitms), e);
try
{
Thread.sleep(waitms);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
Expand Down Expand Up @@ -216,7 +217,10 @@ public StreamSessionConnector(StreamSession session)
public void run()
{
session.start();
logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer);
logger.info("[Stream #{}, ID#{}] Beginning stream session with {}",
SafeArg.of("planId", session.planId()),
SafeArg.of("sessionIndex", session.sessionIndex()),
SafeArg.of("peer", session.peer));
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/java/org/apache/cassandra/streaming/StreamLockfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.UUID;

import com.google.common.base.Charsets;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,7 +88,7 @@ this class will not need to clean up tmp files (on restart), CassandraDaemon doe
}
catch (IOException e)
{
logger.warn(String.format("Could not create lockfile %s for stream session, nothing to worry too much about", lockfile), e);
logger.warn("Could not create lockfile {} for stream session, nothing to worry too much about", SafeArg.of("lockfile", lockfile.toString()), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file marked as safe, inconsistent with the other files that i believe we've decided are unsafe

}
}

Expand All @@ -107,7 +110,7 @@ public void cleanup()
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
logger.warn("failed to delete a potentially stale sstable {}", file);
logger.warn("failed to delete a potentially stale sstable {}", UnsafeArg.of("file", file));
}
}
}
Expand Down
29 changes: 23 additions & 6 deletions src/java/org/apache/cassandra/streaming/StreamReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.UUID;

import com.google.common.base.Throwables;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.format.Version;
Expand Down Expand Up @@ -99,9 +102,15 @@ public SSTableWriter read(ReadableByteChannel channel) throws IOException
throw new IOException("CF " + cfId + " was dropped during streaming");
}

logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
cfs.getColumnFamilyName());
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
SafeArg.of("planId", session.planId()),
SafeArg.of("fileSeqNum", fileSeqNum),
SafeArg.of("peer", session.peer),
SafeArg.of("repairedAt", repairedAt),
SafeArg.of("size", totalSize),
SafeArg.of("keyspace", cfs.keyspace.getName()),
SafeArg.of("table", cfs.getColumnFamilyName()));

DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
Expand All @@ -118,14 +127,22 @@ public SSTableWriter read(ReadableByteChannel channel) throws IOException
// TODO move this to BytesReadTracker
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
SafeArg.of("planId", session.planId()),
SafeArg.of("fileSeqNum", fileSeqNum),
SafeArg.of("peer", session.peer),
SafeArg.of("readBytes", in.getBytesRead()),
SafeArg.of("totalSize", totalSize));
return writer;
} catch (Throwable e)
{
if (key != null)
logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
SafeArg.of("planId", session.planId()),
UnsafeArg.of("key", key),
SafeArg.of("keyspace", cfs.keyspace.getName()),
SafeArg.of("columnFamily", cfs.getColumnFamilyName()));
if (writer != null)
{
try
Expand Down
21 changes: 15 additions & 6 deletions src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand Down Expand Up @@ -151,18 +152,26 @@ public void run()
{
int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
if (invalidatedKeys > 0)
logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
"receive task completed.", task.session.planId(), invalidatedKeys,
cfs.keyspace.getName(), cfs.getColumnFamilyName());
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
"receive task completed.",
SafeArg.of("planId", task.session.planId()),
SafeArg.of("invalidatedKeys", invalidatedKeys),
SafeArg.of("keyspace", cfs.keyspace.getName()),
SafeArg.of("table", cfs.getColumnFamilyName()));
}

if (cfs.metadata.isCounter())
{
int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
if (invalidatedKeys > 0)
logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
"receive task completed.", task.session.planId(), invalidatedKeys,
cfs.keyspace.getName(), cfs.getColumnFamilyName());
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
"receive task completed.",
SafeArg.of("planId", task.session.planId()),
SafeArg.of("invalidatedKeys", invalidatedKeys),
SafeArg.of("keyspace", cfs.keyspace.getName()),
SafeArg.of("table", cfs.getColumnFamilyName()));
}
}
}
Expand Down
33 changes: 21 additions & 12 deletions src/java/org/apache/cassandra/streaming/StreamResultFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.net.IncomingStreamingConnection;

/**
Expand Down Expand Up @@ -85,7 +86,7 @@ static StreamResultFuture init(UUID planId, String description, Collection<Strea
future.addEventListener(listener);
}

logger.info("[Stream #{}] Executing streaming plan for {}", planId, description);
logger.info("[Stream #{}] Executing streaming plan for {}", SafeArg.of("planId", planId), SafeArg.of("description", description));

// Initialize and start all sessions
for (final StreamSession session : coordinator.getAllStreamSessions())
Expand All @@ -110,14 +111,20 @@ public static synchronized StreamResultFuture initReceivingSide(int sessionIndex
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
{
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}",
SafeArg.of("planId", planId),
SafeArg.of("sessionIndex", sessionIndex),
SafeArg.of("description", description));

// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental);
StreamManager.instance.registerReceiving(future);
}
future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
logger.info("[Stream #{}, ID#{}] Received streaming plan for {}",
SafeArg.of("planId", planId),
SafeArg.of("sessionIndex", sessionIndex),
SafeArg.of("description", description));
return future;
}

Expand Down Expand Up @@ -168,20 +175,22 @@ void handleSessionPrepared(StreamSession session)
{
SessionInfo sessionInfo = session.getSessionInfo();
logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
session.planId(),
session.sessionIndex(),
sessionInfo.getTotalFilesToReceive(),
sessionInfo.getTotalSizeToReceive(),
sessionInfo.getTotalFilesToSend(),
sessionInfo.getTotalSizeToSend());
SafeArg.of("planId", session.planId()),
SafeArg.of("sessionIndex", session.sessionIndex()),
SafeArg.of("totalFilesToReceive", sessionInfo.getTotalFilesToReceive()),
SafeArg.of("totalSizeToReceive", sessionInfo.getTotalSizeToReceive()),
SafeArg.of("totalFilesToSend", sessionInfo.getTotalFilesToSend()),
SafeArg.of("totalSizeToSend", sessionInfo.getTotalSizeToSend()));
StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
coordinator.addSessionInfo(sessionInfo);
fireStreamEvent(event);
}

void handleSessionComplete(StreamSession session)
{
logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
logger.info("[Stream #{}] Session with {} is complete",
SafeArg.of("planId", session.planId()),
SafeArg.of("peer", session.peer));
fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
SessionInfo sessionInfo = session.getSessionInfo();
coordinator.addSessionInfo(sessionInfo);
Expand All @@ -208,12 +217,12 @@ private synchronized void maybeComplete()
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
{
logger.warn("[Stream #{}] Stream failed", planId);
logger.warn("[Stream #{}] Stream failed", SafeArg.of("planId", planId));
setException(new StreamException(finalState, "Stream failed"));
}
else
{
logger.info("[Stream #{}] All sessions completed", planId);
logger.info("[Stream #{}] All sessions completed", SafeArg.of("planId", planId));
set(finalState);
}
}
Expand Down
Loading