diff --git a/conanfile.py b/conanfile.py index 071f961..e3ebf5e 100644 --- a/conanfile.py +++ b/conanfile.py @@ -28,7 +28,7 @@ class IOMgrConan(ConanFile): 'shared': False, 'fPIC': True, 'coverage': False, - 'grpc_support': True, + 'grpc_support': False, 'sanitize': False, 'spdk': True, 'testing': 'epoll_mode', diff --git a/prepare.sh b/prepare.sh index b4e90ce..163a414 100755 --- a/prepare.sh +++ b/prepare.sh @@ -10,8 +10,8 @@ echo -n "dpdk." conan export 3rd_party/dpdk echo -n "fio." conan export 3rd_party/fio -echo -n "gprc_internal." -conan export 3rd_party/grpc_internal +#echo -n "gprc_internal." +#conan export 3rd_party/grpc_internal echo -n "spdk." conan export 3rd_party/spdk diff --git a/src/include/iomgr/iomgr.hpp b/src/include/iomgr/iomgr.hpp index 1ec49a8..0579fff 100644 --- a/src/include/iomgr/iomgr.hpp +++ b/src/include/iomgr/iomgr.hpp @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -44,6 +45,25 @@ #include #include +namespace folly { + +/** + * @class folly::IOManagerExecutor + * + * A folly::Executor for which to call `.via()` with when returning a folly::SemiFuture + * for runtime erasure. + * + **/ +class IOManagerExecutor : public Executor { + iomgr::io_fiber_t _fiber; + +public: + IOManagerExecutor(iomgr::io_fiber_t fiber) : _fiber(fiber) {} + void add(Func fn) override; +}; + +} // namespace folly + namespace iomgr { struct timer_info; @@ -99,6 +119,8 @@ class IOManager { friend class IOManagerSpdkImpl; friend class IOManagerEpollImpl; + friend void folly::IOManagerExecutor::add(folly::Func); + static IOManager& instance() { static IOManager inst; return inst; @@ -253,6 +275,8 @@ class IOManager { } } + folly::IOManagerExecutor* fiberExecutor(io_fiber_t fiber); + ///////////////////////////// Access related methods ///////////////////////////// GenericIOInterface* generic_interface() { return m_default_general_iface.get(); } GrpcInterface* grpc_interface() { return m_default_grpc_iface.get(); } @@ -371,6 +395,7 @@ class IOManager { std::shared_mutex m_iface_list_mtx; std::vector< std::shared_ptr< IOInterface > > m_iface_list; + std::map< io_fiber_t, folly::IOManagerExecutor > m_fiber_executors; std::vector< std::shared_ptr< DriveInterface > > m_drive_ifaces; std::shared_ptr< GenericIOInterface > m_default_general_iface; diff --git a/src/lib/iomgr.cpp b/src/lib/iomgr.cpp index 25b4195..04a5635 100644 --- a/src/lib/iomgr.cpp +++ b/src/lib/iomgr.cpp @@ -222,6 +222,7 @@ void IOManager::stop() { // m_default_grpc_iface.reset(); m_drive_ifaces.clear(); m_iface_list.clear(); + m_fiber_executors.clear(); } catch (const std::exception& e) { LOGCRITICAL_AND_FLUSH("Caught exception {} during clear lists", e.what()); } assert(get_state() == iomgr_state::stopped); @@ -314,6 +315,11 @@ void IOManager::foreach_interface(const interface_cb_t& iface_cb) { } } +folly::IOManagerExecutor* IOManager::fiberExecutor(io_fiber_t fiber) { + std::shared_lock lg(m_iface_list_mtx); + return &m_fiber_executors.at(fiber); +} + void IOManager::_run_io_loop(int iomgr_slot_num, loop_type_t loop_type, uint32_t num_fibers, const std::string& name, const iodev_selector_t& iodev_selector, thread_state_notifier_t&& addln_notifier) { loop_type_t ltype = loop_type; @@ -336,6 +342,11 @@ void IOManager::reactor_started(shared< IOReactor > reactor) { m_yet_to_stop_nreactors.increment(); if (reactor->is_worker()) { m_worker_reactors[reactor->m_worker_slot_num] = reactor; + for (auto const& fiber : reactor->sync_io_capable_fibers()) { + auto lg = std::scoped_lock(m_iface_list_mtx); + auto [_, happened] = m_fiber_executors.emplace(fiber, folly::IOManagerExecutor(fiber)); + RELEASE_ASSERT(happened, "Failed to Bind folly::Executor!"); + } reactor->notify_thread_state(true); // All iomgr created reactors are initialized, move iomgr to sys init (next phase of start) @@ -374,6 +385,15 @@ int IOManager::run_on_forget(io_fiber_t fiber, spdk_msg_signature_t fn, void* co return 1; } +} // namespace iomgr + +namespace folly { +void IOManagerExecutor::add(Func fn) { + iomanager.send_msg(_fiber, iomgr::iomgr_msg::create(std::move(fn).asStdFunction())); +} +} // namespace folly + +namespace iomgr { int IOManager::send_msg(io_fiber_t fiber, iomgr_msg* msg) { int ret{0}; if (fiber->spdk_thr) { diff --git a/src/test/test_fiber_shared_mutex.cpp b/src/test/test_fiber_shared_mutex.cpp index 55e2f1e..811196f 100644 --- a/src/test/test_fiber_shared_mutex.cpp +++ b/src/test/test_fiber_shared_mutex.cpp @@ -47,9 +47,7 @@ class SharedMutexTest : public testing::Test { iomgr::FiberManagerLib::shared_mutex m_cb_mtx; std::vector< iomgr::io_fiber_t > m_fibers; uint64_t m_count_per_fiber{0}; - std::mutex m_test_done_mtx; - std::condition_variable m_test_done_cv; - uint32_t m_test_count{0}; + std::atomic_uint32_t m_test_count{0}; protected: void SetUp() override { @@ -88,10 +86,7 @@ class SharedMutexTest : public testing::Test { } LOGINFO("Fiber completed {} of exclusive locks", m_count_per_fiber); - { - std::unique_lock lg(m_test_done_mtx); - if (--m_test_count == 0) { m_test_done_cv.notify_one(); } - } + --m_test_count; } void all_reader() { @@ -100,10 +95,7 @@ class SharedMutexTest : public testing::Test { } LOGINFO("Fiber completed {} of shared locks", m_count_per_fiber); - { - std::unique_lock lg(m_test_done_mtx); - if (--m_test_count == 0) { m_test_done_cv.notify_one(); } - } + --m_test_count; } void random_reader_writer() { @@ -124,10 +116,7 @@ class SharedMutexTest : public testing::Test { } LOGINFO("Fiber completed shared_locks={} exclusive_locks={}", read_count, write_count); - { - std::unique_lock lg(m_test_done_mtx); - if (--m_test_count == 0) { m_test_done_cv.notify_one(); } - } + --m_test_count; } void write_once() { @@ -145,26 +134,27 @@ class SharedMutexTest : public testing::Test { }; TEST_F(SharedMutexTest, single_writer_multiple_readers) { - iomanager.run_on_forget(m_fibers[0], [this]() { all_writer(); }); + auto e = folly::QueuedImmediateExecutor(); + auto calls = std::vector< folly::SemiFuture< folly::Unit > >(); + calls.push_back( + folly::makeSemiFuture().via(iomanager.fiberExecutor(m_fibers[0])).thenValue([this](auto) { all_writer(); })); for (auto it = m_fibers.begin() + 1; it < m_fibers.end(); ++it) { - iomanager.run_on_forget(*it, [this]() { all_reader(); }); - } - - { - std::unique_lock< std::mutex > lk(m_test_done_mtx); - m_test_done_cv.wait(lk, [&]() { return m_test_count == 0; }); + calls.push_back( + folly::makeSemiFuture().via(iomanager.fiberExecutor(*it)).thenValue([this](auto) { all_reader(); })); } + folly::collectAll(calls).via(&e).get(); + EXPECT_EQ(0, m_test_count.load()); } TEST_F(SharedMutexTest, random_reader_writers) { + auto e = folly::QueuedImmediateExecutor(); + auto calls = std::vector< folly::SemiFuture< folly::Unit > >(); for (const auto& f : m_fibers) { - iomanager.run_on_forget(f, [this]() { random_reader_writer(); }); - } - - { - std::unique_lock< std::mutex > lk(m_test_done_mtx); - m_test_done_cv.wait(lk, [&]() { return m_test_count == 0; }); + calls.push_back( + folly::makeSemiFuture().via(iomanager.fiberExecutor(f)).thenValue([this](auto) { all_reader(); })); } + folly::collectAll(calls).via(&e).get(); + EXPECT_EQ(0, m_test_count.load()); } int main(int argc, char* argv[]) {