Skip to content

Commit

Permalink
Merge pull request #10141 from Icinga/log-slow-http-rpc-processing
Browse files Browse the repository at this point in the history
Log HTTP/RPC message processing stats
  • Loading branch information
julianbrost authored Nov 27, 2024
2 parents 4b884ea + 4564c06 commit 57df92f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 15 deletions.
17 changes: 13 additions & 4 deletions lib/remote/httpserverconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,17 @@ bool ProcessRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
HttpServerConnection& server,
bool& hasStartedStreaming,
std::chrono::steady_clock::duration& cpuBoundWorkTime,
boost::asio::yield_context& yc
)
{
namespace http = boost::beast::http;

try {
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
auto start (std::chrono::steady_clock::now());
CpuBoundWork handlingRequest (yc);
cpuBoundWorkTime = std::chrono::steady_clock::now() - start;

HttpHandler::ProcessRequest(stream, authenticatedUser, request, response, yc, server);
} catch (const std::exception& ex) {
Expand Down Expand Up @@ -530,9 +534,14 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
<< ", user: " << (authenticatedUser ? authenticatedUser->GetName() : "<unauthenticated>")
<< ", agent: " << request[http::field::user_agent]; //operator[] - Returns the value for a field, or "" if it does not exist.

Defer addRespCode ([&response, start, &logMsg]() {
logMsg << ", status: " << response.result() << ") took "
<< ch::duration_cast<ch::milliseconds>(ch::steady_clock::now() - start).count() << "ms.";
ch::steady_clock::duration cpuBoundWorkTime(0);
Defer addRespCode ([&response, start, &logMsg, &cpuBoundWorkTime]() {
logMsg << ", status: " << response.result() << ")";
if (cpuBoundWorkTime >= ch::seconds(1)) {
logMsg << " waited " << ch::duration_cast<ch::milliseconds>(cpuBoundWorkTime).count() << "ms on semaphore and";
}

logMsg << " took total " << ch::duration_cast<ch::milliseconds>(ch::steady_clock::now() - start).count() << "ms.";
});

if (!HandleAccessControl(*m_Stream, request, response, yc)) {
Expand All @@ -553,7 +562,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)

m_Seen = std::numeric_limits<decltype(m_Seen)>::max();

if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, yc)) {
if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, cpuBoundWorkTime, yc)) {
break;
}

Expand Down
66 changes: 56 additions & 10 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,19 @@ void JsonRpcConnection::Start()

void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
namespace ch = std::chrono;

auto toMilliseconds ([](ch::steady_clock::duration d) {
return ch::duration_cast<ch::milliseconds>(d).count();
});

m_Stream->next_layer().SetSeen(&m_Seen);

while (!m_ShuttingDown) {
String message;
String jsonString;

try {
message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
Expand All @@ -76,17 +82,50 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
}

m_Seen = Utility::GetTime();
if (m_Endpoint) {
m_Endpoint->AddMessageReceived(jsonString.GetLength());
}

String rpcMethod("UNKNOWN");
ch::steady_clock::duration cpuBoundDuration(0);
auto start (ch::steady_clock::now());

try {
CpuBoundWork handleMessage (yc);

// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;

Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
if (String method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = std::move(method);
}

MessageHandler(message);

l_TaskStats.InsertValue(Utility::GetTime(), 1);

auto total = ch::steady_clock::now() - start;

Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection");
msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity
<< "' (took total " << toMilliseconds(total) << "ms";

if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
}
msg << ").";
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
<< "Error while processing JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
auto total = ch::steady_clock::now() - start;

Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection");
msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '"
<< m_Identity << "' (took total " << toMilliseconds(total) << "ms";

if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
}
msg << "): " << DiagnosticInformation(ex);

break;
}
Expand Down Expand Up @@ -259,10 +298,19 @@ void JsonRpcConnection::Disconnect()
}
}

void JsonRpcConnection::MessageHandler(const String& jsonString)
/**
* Route the provided message to its corresponding handler (if any).
*
* This will first verify the timestamp of that RPC message (if any) and subsequently, rejects any message whose
* timestamp is less than the remote log position of the client Endpoint; otherwise, the endpoint's remote log
* position is updated to that timestamp. It is not expected to happen, but any message lacking an RPC method or
* referring to a non-existent one is also discarded. Afterward, the RPC handler is then called for that message
* and sends it's result back to the sender if the message contains an ID.
*
* @param message The RPC message you want to process.
*/
void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);

if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");

Expand All @@ -281,8 +329,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
origin->FromZone = m_Endpoint->GetZone();
else
origin->FromZone = Zone::GetByName(message->Get("originZone"));

m_Endpoint->AddMessageReceived(jsonString.GetLength());
}

Value vmethod;
Expand Down
3 changes: 2 additions & 1 deletion lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class JsonRpcConnection final : public Object
void CheckLiveness(boost::asio::yield_context yc);

bool ProcessMessage();
void MessageHandler(const String& jsonString);

void MessageHandler(const Dictionary::Ptr& message);

void CertificateRequestResponseHandler(const Dictionary::Ptr& message);

Expand Down

0 comments on commit 57df92f

Please sign in to comment.