Skip to content

Commit

Permalink
Fix p2p crash bug (FISCO-BCOS#4827)
Browse files Browse the repository at this point in the history
Co-authored-by: cyjseagull <[email protected]>
  • Loading branch information
morebtcg and cyjseagull authored Feb 24, 2025
1 parent 99839f9 commit a461217
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
31 changes: 15 additions & 16 deletions bcos-gateway/bcos-gateway/libnetwork/ASIOInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,21 @@ class ASIOInterface
if (socket->isConnected())
{
auto& ioService = socket->ioService();
ioService.post(
[type, socket = socket, buffers, handler = std::move(handler)]() mutable {
switch (type)
{
case TCP_ONLY:
{
ba::async_write(socket->ref(), buffers, std::move(handler));
break;
}
case SSL:
{
ba::async_write(socket->sslref(), buffers, std::move(handler));
break;
}
}
});
ioService.post([type, socket, &buffers, handler = std::move(handler)]() mutable {
switch (type)
{
case TCP_ONLY:
{
ba::async_write(socket->ref(), buffers, std::move(handler));
break;
}
case SSL:
{
ba::async_write(socket->sslref(), buffers, std::move(handler));
break;
}
}
});
}
}

Expand Down
26 changes: 13 additions & 13 deletions bcos-gateway/bcos-gateway/libnetwork/Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Session::Session(
m_server(server),
m_socket(std::move(socket)),
m_idleCheckTimer(
std::make_shared<Timer>(m_socket->ioService(), m_idleTimeInterval, "idleChecker"))
std::make_shared<Timer>(m_socket->ioService(), m_idleTimeInterval, "idleChecker")),
m_writings(std::make_shared<Writings>())
{
SESSION_LOG(INFO) << "[Session::Session] this=" << this
<< LOG_KV("recvBufferSize", m_maxRecvBufferSize);
Expand Down Expand Up @@ -118,7 +119,7 @@ void Session::asyncSendMessage(Message::Ptr message, Options options, SessionCal
{
if (callback && result.has_value())
{
auto error = result.value();
const auto& error = result.value();
auto errorCode = error.errorCode();
auto errorMessage = error.errorMessage();
m_server.get().asyncTo([callback = std::move(callback), errorCode,
Expand Down Expand Up @@ -277,34 +278,33 @@ void Session::write()

try
{
bool writing = false;
if (!m_writing.compare_exchange_strong(writing, true))
if (m_writing.test_and_set())
{
return;
}
std::unique_ptr<std::atomic_bool, decltype([](std::atomic_bool* ptr) { *ptr = false; })>
std::unique_ptr<boost::atomic_flag, decltype([](boost::atomic_flag* ptr) { ptr->clear(); })>
defer(std::addressof(m_writing));

if (!tryPopSomeEncodedMsgs(m_writingPayloads, m_maxSendDataSize, m_maxSendMsgCountS))
if (!tryPopSomeEncodedMsgs(m_writings->payloads, m_maxSendDataSize, m_maxSendMsgCountS))
{
return;
}

boost::container::small_vector<boost::asio::const_buffer, 16> buffers;
auto outputIt = std::back_inserter(buffers);
for (auto& payload : m_writingPayloads)
auto outputIt = std::back_inserter(m_writings->buffers);
for (auto& payload : m_writings->payloads)
{
payload.toConstBuffer(outputIt);
}
defer.release(); // NOLINT
m_server.get().asioInterface()->asyncWrite(m_socket, buffers,
[self = std::weak_ptr<Session>(shared_from_this())](
m_server.get().asioInterface()->asyncWrite(m_socket, m_writings->buffers,
[self = std::weak_ptr<Session>(shared_from_this()), writings = m_writings](
const boost::system::error_code _error, std::size_t _size) mutable {
if (auto session = self.lock())
{
std::vector<Payload> payloads;
payloads.swap(session->m_writingPayloads);
session->m_writing = false;
payloads.swap(session->m_writings->payloads);
session->m_writings->buffers.clear();
session->m_writing.clear();
session->onWrite(_error, _size);

for (auto& payload : payloads)
Expand Down
15 changes: 8 additions & 7 deletions bcos-gateway/bcos-gateway/libnetwork/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,8 @@ class Session : public SessionFace,
std::shared_ptr<SocketFace> m_socket; ///< Socket of peer's connection.

MessageFactory::Ptr m_messageFactory;

tbb::concurrent_queue<Payload> m_writeQueue;
std::atomic_bool m_writing = false;

mutable bcos::Mutex x_info;

boost::atomic_flag m_writing;
bool m_active = false;

SessionCallbackManagerInterface::Ptr m_sessionCallbackManager;
Expand All @@ -321,13 +317,18 @@ class Session : public SessionFace,
std::shared_ptr<bcos::Timer> m_idleCheckTimer;
P2PInfo m_hostInfo;

std::vector<Payload> m_writingPayloads;
struct Writings
{
std::vector<Payload> payloads;
std::vector<boost::asio::const_buffer> buffers;
};
std::shared_ptr<Writings> m_writings;
};

class SessionFactory
{
public:
SessionFactory(P2PInfo const& _hostInfo, uint32_t _sessionRecvBufferSize, // NOLINT
SessionFactory(P2PInfo _hostInfo, uint32_t _sessionRecvBufferSize, // NOLINT
uint32_t _allowMaxMsgSize, uint32_t _maxReadDataSize, uint32_t _maxSendDataSize,
uint32_t _maxSendMsgCountS, bool _enableCompress)
: m_hostInfo(std::move(_hostInfo)),
Expand Down

0 comments on commit a461217

Please sign in to comment.