Skip to content

Commit

Permalink
Merge branch 'release-3.14.0' into merge_3.12.5
Browse files Browse the repository at this point in the history
  • Loading branch information
morebtcg authored Feb 17, 2025
2 parents 979db2a + 7bdfd24 commit 2c89fa5
Show file tree
Hide file tree
Showing 21 changed files with 328 additions and 323 deletions.
9 changes: 5 additions & 4 deletions bcos-executor/test/unittest/mock/MockTxPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ class MockTxPool : public txpool::TxPoolInterface
{
co_return nullptr;
}
void asyncSealTxs(uint64_t, bcos::txpool::TxsHashSetPtr,
std::function<void(Error::Ptr, bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr)>)
override
{}
std::tuple<bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr> sealTxs(
uint64_t, bcos::txpool::TxsHashSetPtr) override
{
return {};
}
void asyncMarkTxs(const bcos::crypto::HashList&, bool, bcos::protocol::BlockNumber,
bcos::crypto::HashType const&, std::function<void(Error::Ptr)>) override
{}
Expand Down
9 changes: 5 additions & 4 deletions bcos-framework/bcos-framework/testutils/faker/FakeTxPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ class FakeTxPool : public TxPoolInterface
void notifyConnectedNodes(
bcos::crypto::NodeIDSet const&, std::function<void(Error::Ptr)>) override
{}
void asyncSealTxs(uint64_t, TxsHashSetPtr,
std::function<void(Error::Ptr, bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr)>)
override
{}
std::tuple<bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr> sealTxs(
uint64_t, TxsHashSetPtr) override
{
return {};
}

void asyncMarkTxs(const HashList&, bool, bcos::protocol::BlockNumber,
bcos::crypto::HashType const&, std::function<void(Error::Ptr)>) override
Expand Down
5 changes: 2 additions & 3 deletions bcos-framework/bcos-framework/txpool/TxPoolInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ class TxPoolInterface
* @param _sealCallback after the txpool responds to the sealed txs, the callback is
* triggered
*/
virtual void asyncSealTxs(uint64_t _txsLimit, TxsHashSetPtr _avoidTxs,
std::function<void(Error::Ptr, bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr)>
_sealCallback) = 0;
virtual std::tuple<bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr> sealTxs(
uint64_t _txsLimit, TxsHashSetPtr _avoidTxs) = 0;

virtual void asyncMarkTxs(const bcos::crypto::HashList& _txsHash, bool _sealedFlag,
bcos::protocol::BlockNumber _batchId, bcos::crypto::HashType const& _batchHash,
Expand Down
9 changes: 5 additions & 4 deletions bcos-scheduler/test/mock/MockTxPool1.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ class MockTxPool1 : public txpool::TxPoolInterface
co_return nullptr;
}

void asyncSealTxs(uint64_t, bcos::txpool::TxsHashSetPtr,
std::function<void(Error::Ptr, bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr)>)
override
{}
std::tuple<bcos::protocol::Block::Ptr, bcos::protocol::Block::Ptr> sealTxs(
uint64_t, bcos::txpool::TxsHashSetPtr) override
{
return {};
}
void asyncMarkTxs(const bcos::crypto::HashList&, bool, bcos::protocol::BlockNumber,
bcos::crypto::HashType const&, std::function<void(Error::Ptr)>) override
{}
Expand Down
47 changes: 41 additions & 6 deletions bcos-sealer/bcos-sealer/Sealer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@
#include "VRFBasedSealer.h"
#include "bcos-framework/ledger/Features.h"
#include <bcos-framework/protocol/GlobalConfig.h>
#include <chrono>
#include <range/v3/view/transform.hpp>
#include <utility>

using namespace bcos;
using namespace bcos::sealer;
using namespace bcos::protocol;
namespace bcos::sealer

bcos::sealer::Sealer::Sealer(SealerConfig::Ptr _sealerConfig)
: Worker("Sealer", 0),
m_sealerConfig(std::move(_sealerConfig)),
m_lastFetchTimepoint(std::chrono::steady_clock::now())
{
class VRFBasedSealer;
m_sealingManager = std::make_shared<SealingManager>(m_sealerConfig);
m_sealingManager->onReady([this]() { noteGenerateProposal(); });
m_hashImpl = m_sealerConfig->blockFactory()->cryptoSuite()->hashImpl();
}

void Sealer::start()
Expand Down Expand Up @@ -97,7 +104,23 @@ void Sealer::asyncNoteLatestBlockHash(crypto::HashType _hash)
void Sealer::executeWorker()
{
// try to fetch transactions
m_sealingManager->fetchTransactions();
if (m_sealingManager->fetchTransactions() == SealingManager::FetchResult::NO_TRANSACTION)
{
// 轮到本节点出块,且超过一定时间取不到交易,广播交易同步请求
// When it is this node's turn to mint a block, and no transactions can be obtained after a
// certain period of time, broadcast a transaction synchronization request.
if (auto duration = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_lastFetchTimepoint.load());
duration > std::chrono::seconds(m_fetchTimeout))
{
increseLastFetchTimepoint();
m_sealerConfig->txpool()->tryToSyncTxsFromPeers();
}
}
else
{
increseLastFetchTimepoint();
}

// try to generateProposal
if (m_sealingManager->shouldGenerateProposal())
Expand All @@ -106,9 +129,7 @@ void Sealer::executeWorker()
[this](bcos::protocol::Block::Ptr _block) -> uint16_t {
return hookWhenSealBlock(std::move(_block));
});
submitProposal(ret.first, ret.second);

// TODO: pullTxsTimer's work
submitProposal(ret.first, std::move(ret.second));
}
else
{
Expand Down Expand Up @@ -189,3 +210,17 @@ uint16_t Sealer::hookWhenSealBlock(bcos::protocol::Block::Ptr _block)
m_sealerConfig->consensus()->consensusConfig()->features().get(
ledger::Features::Flag::bugfix_rpbft_vrf_blocknumber_input));
}

std::chrono::steady_clock::time_point Sealer::increseLastFetchTimepoint()
{
auto now = std::chrono::steady_clock::now();
auto current = m_lastFetchTimepoint.load();
while (now > current && !m_lastFetchTimepoint.compare_exchange_strong(current, now))
{
}
return current;
}
void bcos::sealer::Sealer::setSealingManager(SealingManager::Ptr _sealingManager)
{
m_sealingManager = std::move(_sealingManager);
}
23 changes: 12 additions & 11 deletions bcos-sealer/bcos-sealer/Sealer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,22 @@
#include "SealingManager.h"
#include "bcos-framework/sealer/SealerInterface.h"
#include <bcos-utilities/Worker.h>
#include <chrono>
#include <utility>

namespace bcos::sealer
{
class Sealer : public Worker, public SealerInterface, public std::enable_shared_from_this<Sealer>
{
public:
enum SealBlockResult : uint16_t
enum SealBlockResult : int8_t
{
FAILED = 0,
SUCCESS = 1,
WAIT_FOR_LATEST_BLOCK = 2,
};
using Ptr = std::shared_ptr<Sealer>;
explicit Sealer(SealerConfig::Ptr _sealerConfig)
: Worker("Sealer", 0), m_sealerConfig(std::move(_sealerConfig))
{
m_sealingManager = std::make_shared<SealingManager>(m_sealerConfig);
m_sealingManager->onReady([=, this]() { this->noteGenerateProposal(); });
m_hashImpl = m_sealerConfig->blockFactory()->cryptoSuite()->hashImpl();
}
explicit Sealer(SealerConfig::Ptr _sealerConfig);
~Sealer() override = default;

void start() override;
Expand All @@ -62,24 +57,30 @@ class Sealer : public Worker, public SealerInterface, public std::enable_shared_
virtual void init(bcos::consensus::ConsensusInterface::Ptr _consensus);

uint16_t hookWhenSealBlock([[maybe_unused]] bcos::protocol::Block::Ptr _block) override;
void setFetchTimeout(int fetchTimeout) { m_fetchTimeout = fetchTimeout; }

// only for test
SealerConfig::Ptr sealerConfig() const { return m_sealerConfig; }
SealingManager::Ptr sealingManager() const { return m_sealingManager; }

protected:
void executeWorker() override;
virtual void noteGenerateProposal() { m_signalled.notify_all(); }
void setSealingManager(SealingManager::Ptr _sealingManager);

virtual void submitProposal(bool _containSysTxs, bcos::protocol::Block::Ptr _proposal);
protected:
virtual void noteGenerateProposal() { m_signalled.notify_all(); }
virtual void submitProposal(bool _containSysTxs, bcos::protocol::Block::Ptr _block);

SealerConfig::Ptr m_sealerConfig;
SealingManager::Ptr m_sealingManager;
std::atomic_bool m_running = {false};

boost::condition_variable m_signalled;
// mutex to access m_signalled
std::atomic<std::chrono::steady_clock::time_point> m_lastFetchTimepoint;
int m_fetchTimeout = 5; // Default timeout 5s
boost::mutex x_signalled;
bcos::crypto::Hash::Ptr m_hashImpl;

std::chrono::steady_clock::time_point increseLastFetchTimepoint();
};
} // namespace bcos::sealer
112 changes: 45 additions & 67 deletions bcos-sealer/bcos-sealer/SealingManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "SealingManager.h"
#include "Common.h"
#include "Sealer.h"
#include "bcos-framework/protocol/CommonError.h"

using namespace bcos;
using namespace bcos::sealer;
Expand Down Expand Up @@ -231,93 +232,70 @@ int64_t SealingManager::txsSizeExpectedToFetch()
return (txsSizeToFetch - txsSize);
}

void SealingManager::fetchTransactions()
bcos::sealer::SealingManager::FetchResult SealingManager::fetchTransactions()
{
// fetching transactions currently
auto lock = std::unique_lock{m_fetchingTxsMutex, std::try_to_lock};
if (!lock.owns_lock())
{
return;
return FetchResult::NOT_READY;
}
// no need to sealing
if (m_sealingNumber < m_startSealingNumber || m_sealingNumber > m_endSealingNumber)
{
return;
return FetchResult::NOT_READY;
}

auto txsToFetch = txsSizeExpectedToFetch();
if (txsToFetch == 0)
{
return;
return FetchResult::NOT_READY;
}
// try to fetch transactions
ssize_t startSealingNumber = m_startSealingNumber;
ssize_t endSealingNumber = m_endSealingNumber;
m_config->txpool()->asyncSealTxs(txsToFetch, nullptr,
[self = weak_from_this(), startSealingNumber, endSealingNumber,
lock = std::make_shared<decltype(lock)>(std::move(lock))](
Error::Ptr _error, Block::Ptr _txsHashList, Block::Ptr _sysTxsList) {
try
{
auto sealingMgr = self.lock();
if (!sealingMgr)
{
return;
}

if (_txsHashList->transactionsHashSize() == 0 &&
_sysTxsList->transactionsHashSize() == 0)
{
return;
}
if (_error != nullptr)
{
SEAL_LOG(WARNING) << LOG_DESC("fetchTransactions exception")
<< LOG_KV("returnCode", _error->errorCode())
<< LOG_KV("returnMsg", _error->errorMessage());
return;
}
bool abort = true;
if ((sealingMgr->m_sealingNumber >= startSealingNumber) &&
(sealingMgr->m_sealingNumber <= endSealingNumber))
{
sealingMgr->appendTransactions(sealingMgr->m_pendingTxs, *_txsHashList);
sealingMgr->appendTransactions(sealingMgr->m_pendingSysTxs, *_sysTxsList);
abort = false;
}
else
{
SEAL_LOG(INFO) << LOG_DESC("fetchTransactions finish: abort the expired txs")
<< LOG_KV("txsSize", _txsHashList->transactionsMetaDataSize())
<< LOG_KV("sysTxsSize", _sysTxsList->transactionsMetaDataSize());
// Note: should reset the aborted txs
sealingMgr->notifyResetProposal(*_txsHashList);
sealingMgr->notifyResetProposal(*_sysTxsList);
}
try
{
auto [_txsHashList, _sysTxsList] = m_config->txpool()->sealTxs(txsToFetch, nullptr);
if (_txsHashList->transactionsHashSize() == 0 && _sysTxsList->transactionsHashSize() == 0)
{
return FetchResult::NO_TRANSACTION;
}

sealingMgr->m_onReady();
SEAL_LOG(DEBUG) << LOG_DESC("fetchTransactions finish")
<< LOG_KV("txsSize", _txsHashList->transactionsMetaDataSize())
<< LOG_KV("sysTxsSize", _sysTxsList->transactionsMetaDataSize())
<< LOG_KV("pendingSize", sealingMgr->m_pendingTxs.size())
<< LOG_KV("pendingSysSize", sealingMgr->m_pendingSysTxs.size())
<< LOG_KV("startSealingNumber", startSealingNumber)
<< LOG_KV("endSealingNumber", endSealingNumber)
<< LOG_KV("sealingNumber", sealingMgr->m_sealingNumber)
<< LOG_KV("abort", abort);
}
catch (std::exception const& e)
{
SEAL_LOG(WARNING) << LOG_DESC("fetchTransactions: onRecv sealed txs failed")
<< LOG_KV("message", boost::diagnostic_information(e))
<< LOG_KV(
"fetchedTxsSize", _txsHashList->transactionsMetaDataSize())
<< LOG_KV(
"fetchedSysTxs", _sysTxsList->transactionsMetaDataSize())
<< LOG_KV("returnCode", _error->errorCode())
<< LOG_KV("returnMsg", _error->errorMessage());
}
});
bool abort = true;
if ((m_sealingNumber >= startSealingNumber) && (m_sealingNumber <= endSealingNumber))
{
appendTransactions(m_pendingTxs, *_txsHashList);
appendTransactions(m_pendingSysTxs, *_sysTxsList);
abort = false;
}
else
{
SEAL_LOG(INFO) << LOG_DESC("fetchTransactions finish: abort the expired txs")
<< LOG_KV("txsSize", _txsHashList->transactionsMetaDataSize())
<< LOG_KV("sysTxsSize", _sysTxsList->transactionsMetaDataSize());
// Note: should reset the aborted txs
notifyResetProposal(*_txsHashList);
notifyResetProposal(*_sysTxsList);
}

m_onReady();
SEAL_LOG(DEBUG) << LOG_DESC("fetchTransactions finish")
<< LOG_KV("txsSize", _txsHashList->transactionsMetaDataSize())
<< LOG_KV("sysTxsSize", _sysTxsList->transactionsMetaDataSize())
<< LOG_KV("pendingSize", m_pendingTxs.size())
<< LOG_KV("pendingSysSize", m_pendingSysTxs.size())
<< LOG_KV("startSealingNumber", startSealingNumber)
<< LOG_KV("endSealingNumber", endSealingNumber)
<< LOG_KV("sealingNumber", m_sealingNumber) << LOG_KV("abort", abort);
}
catch (std::exception const& e)
{
SEAL_LOG(WARNING) << LOG_DESC("fetchTransactions: onRecv sealed txs failed")
<< LOG_KV("message", boost::diagnostic_information(e));
}
return FetchResult::SUCCESS;
}

bcos::sealer::SealingManager::SealingManager(SealerConfig::Ptr _config)
Expand Down
9 changes: 8 additions & 1 deletion bcos-sealer/bcos-sealer/SealingManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ class SealingManager : public std::enable_shared_from_this<SealingManager>
virtual void resetLatestHash(crypto::HashType _latestHash);
virtual int64_t latestNumber() const;
virtual crypto::HashType latestHash() const;
virtual void fetchTransactions();

enum class FetchResult : int8_t
{
SUCCESS,
NOT_READY,
NO_TRANSACTION,
};
virtual FetchResult fetchTransactions();

template <class T>
bcos::Handler<> onReady(T callback)
Expand Down
Loading

0 comments on commit 2c89fa5

Please sign in to comment.