diff --git a/conanfile.py b/conanfile.py index 25b9b5d..1bcb47b 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class IOMgrConan(ConanFile): name = "iomgr" - version = "11.0.2" + version = "11.1.0" homepage = "https://github.com/eBay/IOManager" description = "Asynchronous event manager" diff --git a/src/include/iomgr/fiber_lib.hpp b/src/include/iomgr/fiber_lib.hpp index 69178e6..b798b8a 100644 --- a/src/include/iomgr/fiber_lib.hpp +++ b/src/include/iomgr/fiber_lib.hpp @@ -1,248 +1,250 @@ -#pragma once - -#ifdef USE_FOLLY_FIBER -#include -#include -#include -#else -#include -#include -#include -#include -#endif - -struct spdk_thread; - -namespace iomgr { -class IOReactor; -struct iomgr_msg; - -struct IOFiber { -public: - IOReactor* reactor; // Reactor this fiber is currently attached to - spdk_thread* spdk_thr{nullptr}; // In case of spdk, each fiber becomes spdk thread - uint32_t ordinal; // Global ordinal of this fiber (unique id across iomgr) - -public: - IOFiber(IOReactor* r, uint32_t o) : reactor{r}, ordinal{o} {} - virtual ~IOFiber() = default; - - virtual bool push_msg(iomgr_msg* msg) = 0; - virtual iomgr_msg* pop_msg() = 0; - virtual void close_channel() = 0; -}; - -#ifndef USE_FOLLY_FIBER -class FiberManagerLib { -private: - boost::fibers::fiber_specific_ptr< IOFiber > m_this_fiber; - -public: - template < typename T > - class Future : public boost::fibers::future< T > {}; - - template < typename T > - class Promise : public boost::fibers::promise< T > { - public: - /* - const decltype(&Promise< T >::set_value) setValue = &Promise< T >::set_value; - const auto setException = set_exception; - const auto getFuture = get_future; - */ - - void setValue(T t) { this->set_value(std::move(t)); } - - template < typename... Args > - void setException(Args&&... args) { - this->set_exception(args...); - } - - Future< T > getFuture() { - auto f{this->get_future()}; - return std::move(*((Future< T >*)&f)); - } - }; - - using mutex = boost::fibers::mutex; - class shared_mutex { - public: - void lock_shared(); - void lock(); - void unlock_shared(); - void unlock(); - - private: - static constexpr bool s_writer_priority{true}; - boost::fibers::detail::spinlock m_wait_q_splk{}; - boost::fibers::wait_queue m_wait_q; - boost::fibers::context* m_write_owner{nullptr}; - uint32_t m_readers{0}; - uint32_t m_write_waiters{0}; - }; - - FiberManagerLib(); - std::unique_ptr< IOFiber > create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal); - void start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop); - IOFiber* iofiber_self() const; - void set_this_iofiber(IOFiber* f); - void start_io_fiber(); - void yield(); - void yield_main(); -}; - -struct IOFiberBoostImpl : public IOFiber { - static constexpr size_t max_channel_cap{1024}; - -public: - boost::fibers::fiber::id fiber_id; // Boost specific fiber id - boost::fibers::buffered_channel< iomgr_msg* > channel; // Channel to exchange between main and this fiber - std::queue< iomgr_msg* > m_overflow_msgs; // Overflow queue if msgs can't be put in channel - -public: - IOFiberBoostImpl(IOReactor* r, uint32_t ordinal); - bool push_msg(iomgr_msg* msg) override; - iomgr_msg* pop_msg() override; - void close_channel() override; -}; -#else -class FiberManagerLib { -private: - folly::fibers::FiberManager m_fiber_mgr; - -public: - template < typename T, typename BatonT = folly::fibers::Baton > - struct SharedState { - folly::Try< T > val; - BatonT baton; - }; - - template < typename T, typename BatonT = folly::fibers::Baton > - class Future { - private: - std::shared_ptr< SharedState< T, BatonT > > m_shared_state; - - public: - Future(cshared< SharedState< T, BatonT > >& s) : m_shared_state{s} {} - - Future(const Future&) = delete; - Future& operator=(const Future&) = delete; - - Future(Future&& other) noexcept : m_shared_state{std::move(other.m_shared_state)} {} - Future& operator=(Future&& other) { - std::swap(m_shared_state, other.m_shared_state); - return *this; - } - - T get() { - m_shared_state->baton.wait(); - return *m_shared_state->val; - } - }; - - // Simpler implementation of Promise to ensure what we need for iomanager use case - template < typename T, typename BatonT = folly::fibers::Baton > - class Promise { - private: - std::shared_ptr< SharedState< T, BatonT > > m_shared_state; - - public: - Promise() { m_shared_state = std::make_shared< SharedState< T, BatonT > >(); } - - ~Promise() { - // if (m_shared_state->val) { - // setException(folly::make_exception_wrapper< std::logic_error >("promise not fulfilled")); - // } - } - - // not copyable - Promise(const Promise&) = delete; - Promise& operator=(const Promise&) = delete; - - // movable - Promise(Promise&& other) noexcept : m_shared_state{std::move(other.m_shared_state)} {} - Promise& operator=(Promise&& other) { - std::swap(m_shared_state, other.m_shared_state); - return *this; - } - - void setValue() { - static_assert(std::is_same< T, void >::value, "Use setValue(value) instead"); - setTry(folly::Try< void >()); - } - - template < class M > - void setValue(M&& value) { - static_assert(!std::is_same< T, void >::value, "Use setValue() instead"); - setTry(folly::Try< T >(std::forward< M >(value))); - } - - void setTry(folly::Try< T >&& t) { - // if (m_shared_state->val) { throw std::logic_error("promise already fulfilled"); } - - m_shared_state->val = std::move(t); - - // Baton::post has to be the last step here, since if Promise is not owned by the posting thread, it may be - // destroyed right after Baton::post is called. - m_shared_state->baton.post(); - } - - template < class F > - void setWith(F&& func) { - setTry(folly::makeTryWith(std::forward< F >(func))); - } - - void setException(folly::exception_wrapper e) { setTry(folly::Try< T >(e)); } - - void setException(std::exception_ptr e) { setException(folly::exception_wrapper{e}); } - - // Wrapper method to return baton mimicing as future - Future< T > getFuture() { return Future< T >{m_shared_state}; } - }; - - FiberManagerLib(); - std::unique_ptr< IOFiber > create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal); - void start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop); - IOFiber* iofiber_self() const; - void set_this_iofiber(IOFiber* f); - void start_io_fiber(); - void yield(); - void yield_main(); // Yield main fiber - - using mutex = folly::fibers::TimedMutex; - using shared_mutex = folly::fibers::TimedRWMutex; -}; - -class ReactorLoopController : public folly::fibers::LoopController { -private: - class SimpleTimeoutManager; - - folly::fibers::FiberManager* m_fm{nullptr}; - std::unique_ptr< SimpleTimeoutManager > m_tm_mgr; - std::shared_ptr< folly::HHWheelTimer > m_wheel_timer; - std::atomic< int > m_remote_schedule_count{0}; - int m_remote_run_count{0}; - -public: - ReactorLoopController(); - void setFiberManager(folly::fibers::FiberManager* mgr) override { m_fm = mgr; } - void runLoop() override; - void schedule() override; - void scheduleThreadSafe() override; - void runEagerFiber(folly::fibers::Fiber* f) override; - folly::HHWheelTimer* timer() override; -}; - -struct IOFiberFollyImpl : public IOFiber { -public: - std::queue< iomgr_msg* > channel; - folly::fibers::Baton channel_baton; - -public: - IOFiberFollyImpl(IOReactor* r, uint32_t ordinal); - bool push_msg(iomgr_msg* msg) override; - iomgr_msg* pop_msg() override; - void close_channel() override; -}; -#endif - -}; // namespace iomgr +#pragma once + +#ifdef USE_FOLLY_FIBER +#include +#include +#include +#else +#include +#include +#include +#include +#endif + +struct spdk_thread; + +namespace iomgr { +class IOReactor; +struct iomgr_msg; + +struct IOFiber { +public: + IOReactor* reactor; // Reactor this fiber is currently attached to + spdk_thread* spdk_thr{nullptr}; // In case of spdk, each fiber becomes spdk thread + uint32_t ordinal; // Global ordinal of this fiber (unique id across iomgr) + +public: + IOFiber(IOReactor* r, uint32_t o) : reactor{r}, ordinal{o} {} + virtual ~IOFiber() = default; + + virtual bool push_msg(iomgr_msg* msg) = 0; + virtual iomgr_msg* pop_msg() = 0; + virtual void close_channel() = 0; +}; + +#ifndef USE_FOLLY_FIBER +class FiberManagerLib { +private: + boost::fibers::fiber_specific_ptr< IOFiber > m_this_fiber; + +public: + template < typename T > + class Future : public boost::fibers::future< T > {}; + + template < typename T > + class Promise : public boost::fibers::promise< T > { + public: + /* + const decltype(&Promise< T >::set_value) setValue = &Promise< T >::set_value; + const auto setException = set_exception; + const auto getFuture = get_future; + */ + + void setValue(T t) { this->set_value(std::move(t)); } + + template < typename... Args > + void setException(Args&&... args) { + this->set_exception(args...); + } + + Future< T > getFuture() { + auto f{this->get_future()}; + return std::move(*((Future< T >*)&f)); + } + }; + + using mutex = boost::fibers::mutex; + class shared_mutex { + public: + void lock_shared(); + void lock(); + void unlock_shared(); + void unlock(); + + private: + static constexpr bool s_writer_priority{true}; + boost::fibers::detail::spinlock m_wait_q_splk{}; + boost::fibers::wait_queue m_wait_q; + boost::fibers::context* m_write_owner{nullptr}; + uint32_t m_readers{0}; + uint32_t m_write_waiters{0}; + }; + + FiberManagerLib(); + std::unique_ptr< IOFiber > create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal); + void start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop); + IOFiber* iofiber_self() const; + uint32_t iofiber_self_ordinal() const; + void set_this_iofiber(IOFiber* f); + void start_io_fiber(); + void yield(); + void yield_main(); +}; + +struct IOFiberBoostImpl : public IOFiber { + static constexpr size_t max_channel_cap{1024}; + +public: + boost::fibers::fiber::id fiber_id; // Boost specific fiber id + boost::fibers::buffered_channel< iomgr_msg* > channel; // Channel to exchange between main and this fiber + std::queue< iomgr_msg* > m_overflow_msgs; // Overflow queue if msgs can't be put in channel + +public: + IOFiberBoostImpl(IOReactor* r, uint32_t ordinal); + bool push_msg(iomgr_msg* msg) override; + iomgr_msg* pop_msg() override; + void close_channel() override; +}; +#else +class FiberManagerLib { +private: + folly::fibers::FiberManager m_fiber_mgr; + +public: + template < typename T, typename BatonT = folly::fibers::Baton > + struct SharedState { + folly::Try< T > val; + BatonT baton; + }; + + template < typename T, typename BatonT = folly::fibers::Baton > + class Future { + private: + std::shared_ptr< SharedState< T, BatonT > > m_shared_state; + + public: + Future(cshared< SharedState< T, BatonT > >& s) : m_shared_state{s} {} + + Future(const Future&) = delete; + Future& operator=(const Future&) = delete; + + Future(Future&& other) noexcept : m_shared_state{std::move(other.m_shared_state)} {} + Future& operator=(Future&& other) { + std::swap(m_shared_state, other.m_shared_state); + return *this; + } + + T get() { + m_shared_state->baton.wait(); + return *m_shared_state->val; + } + }; + + // Simpler implementation of Promise to ensure what we need for iomanager use case + template < typename T, typename BatonT = folly::fibers::Baton > + class Promise { + private: + std::shared_ptr< SharedState< T, BatonT > > m_shared_state; + + public: + Promise() { m_shared_state = std::make_shared< SharedState< T, BatonT > >(); } + + ~Promise() { + // if (m_shared_state->val) { + // setException(folly::make_exception_wrapper< std::logic_error >("promise not fulfilled")); + // } + } + + // not copyable + Promise(const Promise&) = delete; + Promise& operator=(const Promise&) = delete; + + // movable + Promise(Promise&& other) noexcept : m_shared_state{std::move(other.m_shared_state)} {} + Promise& operator=(Promise&& other) { + std::swap(m_shared_state, other.m_shared_state); + return *this; + } + + void setValue() { + static_assert(std::is_same< T, void >::value, "Use setValue(value) instead"); + setTry(folly::Try< void >()); + } + + template < class M > + void setValue(M&& value) { + static_assert(!std::is_same< T, void >::value, "Use setValue() instead"); + setTry(folly::Try< T >(std::forward< M >(value))); + } + + void setTry(folly::Try< T >&& t) { + // if (m_shared_state->val) { throw std::logic_error("promise already fulfilled"); } + + m_shared_state->val = std::move(t); + + // Baton::post has to be the last step here, since if Promise is not owned by the posting thread, it may be + // destroyed right after Baton::post is called. + m_shared_state->baton.post(); + } + + template < class F > + void setWith(F&& func) { + setTry(folly::makeTryWith(std::forward< F >(func))); + } + + void setException(folly::exception_wrapper e) { setTry(folly::Try< T >(e)); } + + void setException(std::exception_ptr e) { setException(folly::exception_wrapper{e}); } + + // Wrapper method to return baton mimicing as future + Future< T > getFuture() { return Future< T >{m_shared_state}; } + }; + + FiberManagerLib(); + std::unique_ptr< IOFiber > create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal); + void start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop); + IOFiber* iofiber_self() const; + uint32_t iofiber_self_ordinal() const; + void set_this_iofiber(IOFiber* f); + void start_io_fiber(); + void yield(); + void yield_main(); // Yield main fiber + + using mutex = folly::fibers::TimedMutex; + using shared_mutex = folly::fibers::TimedRWMutex; +}; + +class ReactorLoopController : public folly::fibers::LoopController { +private: + class SimpleTimeoutManager; + + folly::fibers::FiberManager* m_fm{nullptr}; + std::unique_ptr< SimpleTimeoutManager > m_tm_mgr; + std::shared_ptr< folly::HHWheelTimer > m_wheel_timer; + std::atomic< int > m_remote_schedule_count{0}; + int m_remote_run_count{0}; + +public: + ReactorLoopController(); + void setFiberManager(folly::fibers::FiberManager* mgr) override { m_fm = mgr; } + void runLoop() override; + void schedule() override; + void scheduleThreadSafe() override; + void runEagerFiber(folly::fibers::Fiber* f) override; + folly::HHWheelTimer* timer() override; +}; + +struct IOFiberFollyImpl : public IOFiber { +public: + std::queue< iomgr_msg* > channel; + folly::fibers::Baton channel_baton; + +public: + IOFiberFollyImpl(IOReactor* r, uint32_t ordinal); + bool push_msg(iomgr_msg* msg) override; + iomgr_msg* pop_msg() override; + void close_channel() override; +}; +#endif + +}; // namespace iomgr diff --git a/src/lib/epoll/reactor_epoll.cpp b/src/lib/epoll/reactor_epoll.cpp index 5e139ad..afbffc5 100644 --- a/src/lib/epoll/reactor_epoll.cpp +++ b/src/lib/epoll/reactor_epoll.cpp @@ -54,18 +54,18 @@ void IOReactorEPoll::init_impl() { m_epollfd = epoll_create1(0); if (m_epollfd < 1) { assert(0); - REACTOR_LOG(ERROR, , , "epoll_create failed: {}", strerror(errno)); + REACTOR_LOG(ERROR, "epoll_create failed: {}", strerror(errno)); goto error; } std::atomic_thread_fence(std::memory_order_acquire); - REACTOR_LOG(TRACE, , , "EPoll created: {}", m_epollfd); + REACTOR_LOG(TRACE, "EPoll created: {}", m_epollfd); // Create a message fd and add it to tht epollset evfd = eventfd(0, EFD_NONBLOCK); if (evfd == -1) { assert(0); - REACTOR_LOG(ERROR, , , "Unable to open the eventfd, marking this as non-io reactor"); + REACTOR_LOG(ERROR, "Unable to open the eventfd, marking this as non-io reactor"); goto error; } m_msg_iodev = iomanager.generic_interface()->make_io_device(backing_dev_t{evfd}, EPOLLIN, 1 /* pri */, nullptr, @@ -103,6 +103,9 @@ void IOReactorEPoll::stop_impl() { iomgr_msg::free(msg); } if (dropped) { LOGINFO("Exiting the reactor with {} messages yet to handle, dropping them", dropped); } + + // Now clear all the iodevs which were removed + m_removed_iodevs.clear(); } void IOReactorEPoll::listen() { @@ -117,27 +120,34 @@ void IOReactorEPoll::listen() { idle_time_wakeup_poller(); return; } else if (num_fds < 0) { - REACTOR_LOG(ERROR, , , "epoll wait failed: {} strerror {}", errno, strerror(errno)); + REACTOR_LOG(ERROR, "epoll wait failed: {} strerror {}", errno, strerror(errno)); return; } m_metrics->fds_on_event_count += num_fds; // Next sort the events based on priority and handle them in that order std::sort(events.begin(), (events.begin() + num_fds), compare_priority); - for (auto i = 0; i < num_fds; ++i) { + + m_event_processing_phase = true; + for (auto i = 0; (i < num_fds) && (m_keep_running); ++i) { auto& e = events[i]; if (e.data.ptr == (void*)m_msg_iodev.get()) { - REACTOR_LOG(TRACE, , , "Processing event on msg fd: {}", m_msg_iodev->fd()); + REACTOR_LOG(TRACE, "Processing event on msg fd: {}", m_msg_iodev->fd()); ++m_metrics->msg_event_wakeup_count; on_msg_fd_notification(); // It is possible for io thread status by the msg processor. Catch at the exit and return if (!is_io_reactor()) { - REACTOR_LOG(INFO, , , "listen will exit because this is no longer an io reactor"); + REACTOR_LOG(INFO, "listen will exit because this is no longer an io reactor"); return; } } else { IODevice* iodev = (IODevice*)e.data.ptr; + if (!m_removed_iodevs.empty() && (m_removed_iodevs.find(iodev) != m_removed_iodevs.cend())) { + REACTOR_LOG(DEBUG, "Skipping event on fd {} as it is has been removed", iodev->fd()); + continue; + } + if (iodev->tinfo) { ++m_metrics->timer_wakeup_count; timer_epoll::on_timer_fd_notification(iodev); @@ -146,6 +156,10 @@ void IOReactorEPoll::listen() { } } } + m_event_processing_phase = false; + + // Now clear all the iodevs which were removed, thus free IODev pointer + if (!m_removed_iodevs.empty()) { m_removed_iodevs.clear(); } } int IOReactorEPoll::add_iodev_impl(const io_device_ptr& iodev) { @@ -157,7 +171,7 @@ int IOReactorEPoll::add_iodev_impl(const io_device_ptr& iodev) { strerror(errno)); return -1; } - REACTOR_LOG(DEBUG, , , "Added fd {} to this io thread's epoll fd {}, data.ptr={}", iodev->fd(), m_epollfd, + REACTOR_LOG(DEBUG, "Added fd {} to this io thread's epoll fd {}, data.ptr={}", iodev->fd(), m_epollfd, (void*)ev.data.ptr); return 0; } @@ -168,18 +182,24 @@ int IOReactorEPoll::remove_iodev_impl(const io_device_ptr& iodev) { strerror(errno)); return -1; } - REACTOR_LOG(DEBUG, , , "Removed fd {} from this io thread's epoll fd {}", iodev->fd(), m_epollfd); + REACTOR_LOG(DEBUG, "Removed fd {} from this io thread's epoll fd {}", iodev->fd(), m_epollfd); + + // If we are processing batch of events and on first event, if we remove the iodev->fd from epoll and free the + // iodev, it is possible that next event could target the same iodev before epoll got a chance to process the + // epoll_ctl. In that case, listen() will refer to iodev, which could have been freed by the remove_iodev() caller + // and can cause use-after-free. Thus we are saving the iodev until we are done with the batch of events. + if (m_event_processing_phase) { m_removed_iodevs.insert(std::make_pair(iodev.get(), iodev)); } return 0; } void IOReactorEPoll::put_msg(iomgr_msg* msg) { if (!m_msg_iodev) { - REACTOR_LOG(INFO, , , "Received msg after reactor is shutdown, ignoring"); + REACTOR_LOG(INFO, "Received msg after reactor is shutdown, ignoring"); iomgr_msg::free(msg); return; } - REACTOR_LOG(DEBUG, , , "Put msg to its msg fd = {}, ptr = {}", m_msg_iodev->fd(), (void*)m_msg_iodev.get()); + REACTOR_LOG(DEBUG, "Put msg to its msg fd = {}, ptr = {}", m_msg_iodev->fd(), (void*)m_msg_iodev.get()); m_msg_q.enqueue(msg); @@ -217,7 +237,7 @@ void IOReactorEPoll::process_messages() { } if ((msg_count == max_msg_batch_size) && (!m_msg_q.empty())) { - REACTOR_LOG(DEBUG, , , "Reached max msg_count batch {}, yielding and will process again", msg_count); + REACTOR_LOG(DEBUG, "Reached max msg_count batch {}, yielding and will process again", msg_count); const uint64_t temp{1}; while ((write(m_msg_iodev->fd(), &temp, sizeof(uint64_t)) < 0) && (errno == EAGAIN)) { ++m_metrics->msg_iodev_busy_count; @@ -237,7 +257,7 @@ void IOReactorEPoll::on_user_iodev_notification(IODevice* iodev, int event) { ++m_metrics->outstanding_ops; ++m_metrics->io_event_wakeup_count; - REACTOR_LOG(TRACE, , , "Processing event on user iodev: {}", iodev->dev_id()); + REACTOR_LOG(TRACE, "Processing event on user iodev: {}", iodev->dev_id()); iodev->cb(iodev, iodev->cookie, event); --m_metrics->outstanding_ops; diff --git a/src/lib/epoll/reactor_epoll.hpp b/src/lib/epoll/reactor_epoll.hpp index 38b50a7..5176295 100644 --- a/src/lib/epoll/reactor_epoll.hpp +++ b/src/lib/epoll/reactor_epoll.hpp @@ -41,9 +41,11 @@ class IOReactorEPoll : public IOReactor { void idle_time_wakeup_poller(); private: - std::atomic< bool > m_msg_handler_on; // Is Message handling ongoing now - int m_epollfd = -1; // Parent epoll context for this thread - io_device_ptr m_msg_iodev; // iodev for the messages - folly::UMPSCQueue< iomgr_msg*, false > m_msg_q; // Q of message for this thread + std::atomic< bool > m_msg_handler_on; // Is Message handling ongoing now + int m_epollfd = -1; // Parent epoll context for this thread + io_device_ptr m_msg_iodev; // iodev for the messages + folly::UMPSCQueue< iomgr_msg*, false > m_msg_q; // Q of message for this thread + std::unordered_map< IODevice*, shared< IODevice > > m_removed_iodevs; // Pending IODevices to be removed from epoll + bool m_event_processing_phase{false}; // Are epoll events are being processed now }; } // namespace iomgr diff --git a/src/lib/reactor/fiber_lib_boost.cpp b/src/lib/reactor/fiber_lib_boost.cpp index c1c3236..38e7898 100644 --- a/src/lib/reactor/fiber_lib_boost.cpp +++ b/src/lib/reactor/fiber_lib_boost.cpp @@ -1,135 +1,140 @@ -#ifndef USE_FOLLY_FIBER -#include "reactor/reactor.hpp" -#include - -namespace iomgr { -///////////////////////// FiberManagerLib Section ////////////////////////////// -FiberManagerLib::FiberManagerLib() : m_this_fiber{[](IOFiber* f) {}} {} - -std::unique_ptr< IOFiber > FiberManagerLib::create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal) { - return std::make_unique< IOFiberBoostImpl >(reactor, fiber_ordinal); -} - -void FiberManagerLib::start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop) { - boost::fibers::fiber([f, this, channel_loop]() { - set_this_iofiber(f); - channel_loop(f); - }).detach(); -} - -void FiberManagerLib::set_this_iofiber(IOFiber* f) { - auto* fiber = r_cast< IOFiberBoostImpl* >(f); - fiber->fiber_id = boost::this_fiber::get_id(); - m_this_fiber.reset(fiber); -} - -IOFiber* FiberManagerLib::iofiber_self() const { return &(*m_this_fiber); }; - -void FiberManagerLib::yield() { boost::this_fiber::yield(); } - -void FiberManagerLib::yield_main() { boost::this_fiber::yield(); } - -////////////////////////// shared_mutex implementation /////////////////////////// -void FiberManagerLib::shared_mutex::lock_shared() { - do { - boost::fibers::context* active_ctx = boost::fibers::context::active(); - - boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; - if ((m_write_owner != nullptr) || (m_write_waiters != 0)) { - // Either Owned by the writer or some writer is waiting, add to the waiter q - LOGTRACEMOD(iomgr, "[Reader Lock for ctx={}]: Queued. Owned by a writer={} and/or writers are waiting={}", - (void*)active_ctx, (void*)m_write_owner, m_write_waiters); - m_wait_q.suspend_and_wait(lk, active_ctx); - } else { - ++m_readers; // No fiber is holding write access or needing write access - LOGTRACEMOD(iomgr, "[Reader Lock for ctx={}]: Won, num_readers={}", (void*)active_ctx, m_readers); - break; - } - } while (true); -} - -void FiberManagerLib::shared_mutex::lock() { - bool am_i_write_waiter{false}; - do { - boost::fibers::context* active_ctx = boost::fibers::context::active(); - boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; - if ((m_write_owner == nullptr) && (m_readers == 0)) { - m_write_owner = active_ctx; // No exclusive and shared access yet - if (am_i_write_waiter) { - --m_write_waiters; - LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Was waiter, now won, num_write_waiters={}", - (void*)active_ctx, m_write_waiters); - } else { - LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Won", (void*)active_ctx); - } - break; - } else { - // Owned by the writer or reader, add ourselves to writer queue - if (s_writer_priority) { - if (!am_i_write_waiter) { - ++m_write_waiters; - am_i_write_waiter = true; - } - LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Queued, owned by owner={} num_readers={}", - (void*)active_ctx, (void*)m_write_owner, m_readers); - } - m_wait_q.suspend_and_wait(lk, active_ctx); - LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Queued earlier now awakened, num_readers={}", - (void*)active_ctx, m_readers); - } - } while (true); -} - -void FiberManagerLib::shared_mutex::unlock_shared() { - boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; - if (m_readers == 0) { - throw boost::fibers::lock_error{std::make_error_code(std::errc::operation_not_permitted), - "boost fiber: duplicate unlock shared, can cause deadlocks"}; - } - --m_readers; - LOGTRACEMOD(iomgr, "Reader unlock, num_readers={}", m_readers); - m_wait_q.notify_all(); -} - -void FiberManagerLib::shared_mutex::unlock() { - boost::fibers::context* active_ctx = boost::fibers::context::active(); - - boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; - if (m_write_owner != active_ctx) { - throw boost::fibers::lock_error{std::make_error_code(std::errc::operation_not_permitted), - "boost fiber: unlock called by different fiber to the one locked it"}; - } - m_write_owner = nullptr; - LOGTRACEMOD(iomgr, "[Writer unLock for ctx={}]: unlocked", (void*)active_ctx); - m_wait_q.notify_all(); -} - -///////////////////////// IOFiberBoostImpl Section ////////////////////////////// -IOFiberBoostImpl::IOFiberBoostImpl(IOReactor* r, uint32_t ordinal) : IOFiber{r, ordinal}, channel{max_channel_cap} {} - -bool IOFiberBoostImpl::push_msg(iomgr_msg* msg) { - auto status = channel.try_push(msg); - if (status == boost::fibers::channel_op_status::full) { m_overflow_msgs.push(msg); } - return (status == boost::fibers::channel_op_status::success); -} - -iomgr_msg* IOFiberBoostImpl::pop_msg() { - iomgr_msg* msg{nullptr}; - channel.pop(msg); - - // We poped a msg, so overflow msgs, it can be pushed at the tail of the fiber channel queue - if (!m_overflow_msgs.empty()) { - auto status = channel.try_push(m_overflow_msgs.front()); - if (status != boost::fibers::channel_op_status::success) { - LOGMSG_ASSERT_EQ((int)status, (int)boost::fibers::channel_op_status::success, - "Moving msg from overflow to fiber loop channel has failed, unexpected"); - } else { - m_overflow_msgs.pop(); - } - } - return msg; -} - -void IOFiberBoostImpl::close_channel() { channel.close(); } -} // namespace iomgr -#endif +#ifndef USE_FOLLY_FIBER +#include "reactor/reactor.hpp" +#include + +namespace iomgr { +///////////////////////// FiberManagerLib Section ////////////////////////////// +FiberManagerLib::FiberManagerLib() : m_this_fiber{[](IOFiber* f) {}} {} + +std::unique_ptr< IOFiber > FiberManagerLib::create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal) { + return std::make_unique< IOFiberBoostImpl >(reactor, fiber_ordinal); +} + +void FiberManagerLib::start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop) { + boost::fibers::fiber([f, this, channel_loop]() { + set_this_iofiber(f); + channel_loop(f); + }).detach(); +} + +void FiberManagerLib::set_this_iofiber(IOFiber* f) { + auto* fiber = r_cast< IOFiberBoostImpl* >(f); + fiber->fiber_id = boost::this_fiber::get_id(); + m_this_fiber.reset(fiber); +} + +IOFiber* FiberManagerLib::iofiber_self() const { return &(*m_this_fiber); }; + +uint32_t FiberManagerLib::iofiber_self_ordinal() const { + auto f = iofiber_self(); + return f ? f->ordinal : 0; +} + +void FiberManagerLib::yield() { boost::this_fiber::yield(); } + +void FiberManagerLib::yield_main() { boost::this_fiber::yield(); } + +////////////////////////// shared_mutex implementation /////////////////////////// +void FiberManagerLib::shared_mutex::lock_shared() { + do { + boost::fibers::context* active_ctx = boost::fibers::context::active(); + + boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; + if ((m_write_owner != nullptr) || (m_write_waiters != 0)) { + // Either Owned by the writer or some writer is waiting, add to the waiter q + LOGTRACEMOD(iomgr, "[Reader Lock for ctx={}]: Queued. Owned by a writer={} and/or writers are waiting={}", + (void*)active_ctx, (void*)m_write_owner, m_write_waiters); + m_wait_q.suspend_and_wait(lk, active_ctx); + } else { + ++m_readers; // No fiber is holding write access or needing write access + LOGTRACEMOD(iomgr, "[Reader Lock for ctx={}]: Won, num_readers={}", (void*)active_ctx, m_readers); + break; + } + } while (true); +} + +void FiberManagerLib::shared_mutex::lock() { + bool am_i_write_waiter{false}; + do { + boost::fibers::context* active_ctx = boost::fibers::context::active(); + boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; + if ((m_write_owner == nullptr) && (m_readers == 0)) { + m_write_owner = active_ctx; // No exclusive and shared access yet + if (am_i_write_waiter) { + --m_write_waiters; + LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Was waiter, now won, num_write_waiters={}", + (void*)active_ctx, m_write_waiters); + } else { + LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Won", (void*)active_ctx); + } + break; + } else { + // Owned by the writer or reader, add ourselves to writer queue + if (s_writer_priority) { + if (!am_i_write_waiter) { + ++m_write_waiters; + am_i_write_waiter = true; + } + LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Queued, owned by owner={} num_readers={}", + (void*)active_ctx, (void*)m_write_owner, m_readers); + } + m_wait_q.suspend_and_wait(lk, active_ctx); + LOGTRACEMOD(iomgr, "[Writer Lock for ctx={}]: Queued earlier now awakened, num_readers={}", + (void*)active_ctx, m_readers); + } + } while (true); +} + +void FiberManagerLib::shared_mutex::unlock_shared() { + boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; + if (m_readers == 0) { + throw boost::fibers::lock_error{std::make_error_code(std::errc::operation_not_permitted), + "boost fiber: duplicate unlock shared, can cause deadlocks"}; + } + --m_readers; + LOGTRACEMOD(iomgr, "Reader unlock, num_readers={}", m_readers); + m_wait_q.notify_all(); +} + +void FiberManagerLib::shared_mutex::unlock() { + boost::fibers::context* active_ctx = boost::fibers::context::active(); + + boost::fibers::detail::spinlock_lock lk{m_wait_q_splk}; + if (m_write_owner != active_ctx) { + throw boost::fibers::lock_error{std::make_error_code(std::errc::operation_not_permitted), + "boost fiber: unlock called by different fiber to the one locked it"}; + } + m_write_owner = nullptr; + LOGTRACEMOD(iomgr, "[Writer unLock for ctx={}]: unlocked", (void*)active_ctx); + m_wait_q.notify_all(); +} + +///////////////////////// IOFiberBoostImpl Section ////////////////////////////// +IOFiberBoostImpl::IOFiberBoostImpl(IOReactor* r, uint32_t ordinal) : IOFiber{r, ordinal}, channel{max_channel_cap} {} + +bool IOFiberBoostImpl::push_msg(iomgr_msg* msg) { + auto status = channel.try_push(msg); + if (status == boost::fibers::channel_op_status::full) { m_overflow_msgs.push(msg); } + return (status == boost::fibers::channel_op_status::success); +} + +iomgr_msg* IOFiberBoostImpl::pop_msg() { + iomgr_msg* msg{nullptr}; + channel.pop(msg); + + // We poped a msg, so overflow msgs, it can be pushed at the tail of the fiber channel queue + if (!m_overflow_msgs.empty()) { + auto status = channel.try_push(m_overflow_msgs.front()); + if (status != boost::fibers::channel_op_status::success) { + LOGMSG_ASSERT_EQ((int)status, (int)boost::fibers::channel_op_status::success, + "Moving msg from overflow to fiber loop channel has failed, unexpected"); + } else { + m_overflow_msgs.pop(); + } + } + return msg; +} + +void IOFiberBoostImpl::close_channel() { channel.close(); } +} // namespace iomgr +#endif diff --git a/src/lib/reactor/fiber_lib_folly.cpp b/src/lib/reactor/fiber_lib_folly.cpp index 11fddc2..897f974 100644 --- a/src/lib/reactor/fiber_lib_folly.cpp +++ b/src/lib/reactor/fiber_lib_folly.cpp @@ -1,126 +1,131 @@ -#ifdef USE_FOLLY_FIBER -#include "reactor/reactor.hpp" -#include -#include - -namespace iomgr { -/////////////////////////////////////// FiberManagerLib Section /////////////////////////////////////// -FiberManagerLib::FiberManagerLib() : - m_fiber_mgr{folly::fibers::LocalType< IOFiberFollyImpl* >(), std::make_unique< ReactorLoopController >()} {} - -std::unique_ptr< IOFiber > FiberManagerLib::create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal) { - return std::make_unique< IOFiberFollyImpl >(reactor, fiber_ordinal); -} - -void FiberManagerLib::start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop) { - m_fiber_mgr.addTask([this, f, channel_loop]() { - set_this_iofiber(f); - channel_loop(f); - }); -} - -void FiberManagerLib::set_this_iofiber(IOFiber* f) { - auto* fiber = r_cast< IOFiberFollyImpl* >(f); - folly::fibers::local< IOFiberFollyImpl* >() = fiber; -} - -IOFiber* FiberManagerLib::iofiber_self() const { return folly::fibers::local< IOFiberFollyImpl* >(); }; - -void FiberManagerLib::yield() { m_fiber_mgr.yield(); } - -void FiberManagerLib::yield_main() { m_fiber_mgr.loopUntilNoReadyImpl(); } - -/////////////////////////////////////// ReactorLoopController Section /////////////////////////////////////// -ReactorLoopController::ReactorLoopController() : - m_tm_mgr(std::make_unique< SimpleTimeoutManager >(*this)), - m_wheel_timer(folly::HHWheelTimer::newTimer(m_tm_mgr.get())) {} - -void ReactorLoopController::runLoop() { - do { - if (m_remote_run_count < m_remote_schedule_count) { - for (; m_remote_schedule_count < m_remote_schedule_count; ++m_remote_run_count) { - if (m_fm->shouldRunLoopRemote()) { m_fm->loopUntilNoReadyImpl(); } - } - } else { - m_fm->loopUntilNoReadyImpl(); - } - } while (m_remote_run_count < m_remote_schedule_count); -} - -void ReactorLoopController::schedule() {} - -void ReactorLoopController::scheduleThreadSafe() { ++m_remote_schedule_count; } - -void ReactorLoopController::runEagerFiber(folly::fibers::Fiber* f) { m_fm->runEagerFiberImpl(f); } - -folly::HHWheelTimer* ReactorLoopController::timer() { return m_wheel_timer.get(); } - -////////////////////////// Used the code from folly to implement a simple version of TimeoutManager -/** - * A simple version of TimeoutManager that maintains only a single AsyncTimeout - * object that is used by HHWheelTimer in SimpleLoopController. - */ -class ReactorLoopController::SimpleTimeoutManager : public folly::TimeoutManager { -public: - explicit SimpleTimeoutManager(ReactorLoopController& lc) : m_rlc(lc) {} - - void attachTimeoutManager(folly::AsyncTimeout*, folly::TimeoutManager::InternalEnum) final {} - void detachTimeoutManager(folly::AsyncTimeout*) final {} - - bool scheduleTimeout(folly::AsyncTimeout* obj, timeout_type timeout) final { - // Make sure that we don't try to use this manager with two timeouts. - // CHECK(!timeout_ || timeout_->first == obj); - m_timeout.emplace(obj, std::chrono::steady_clock::now() + timeout); - return true; - } - - void cancelTimeout(folly::AsyncTimeout* obj) final { - // CHECK(timeout_ && timeout_->first == obj); - m_timeout.reset(); - } - - void bumpHandlingTime() final {} - - // bool isInTimeoutManagerThread() final { return m_rlc.isInLoopThread(); } - bool isInTimeoutManagerThread() final { return false; } - - void runTimeouts() { - std::chrono::steady_clock::time_point tp = std::chrono::steady_clock::now(); - if (!m_timeout || tp < m_timeout->second) { return; } - - auto* timeout = m_timeout->first; - m_timeout.reset(); - timeout->timeoutExpired(); - } - -private: - ReactorLoopController& m_rlc; - folly::Optional< std::pair< folly::AsyncTimeout*, std::chrono::steady_clock::time_point > > m_timeout; -}; - -/////////////////////////////////////// IOFiberFollyImpl Section /////////////////////////////////////// -IOFiberFollyImpl::IOFiberFollyImpl(IOReactor* r, uint32_t ordinal) : IOFiber{r, ordinal} {} - -bool IOFiberFollyImpl::push_msg(iomgr_msg* msg) { - channel.push(msg); - channel_baton.post(); - return true; -} - -iomgr_msg* IOFiberFollyImpl::pop_msg() { - iomgr_msg* msg{nullptr}; - channel_baton.wait(); - // LOGMSG_ASSERT_NE(channel.empty(), true, "Fiber channel baton wokenup but no msg in queue"); - if (!channel.empty()) { - msg = channel.front(); - channel.pop(); - } - return msg; -} - -void IOFiberFollyImpl::close_channel() { - std::queue< iomgr_msg* > empty; - std::swap(channel, empty); -} -} // namespace iomgr -#endif +#ifdef USE_FOLLY_FIBER +#include "reactor/reactor.hpp" +#include +#include + +namespace iomgr { +/////////////////////////////////////// FiberManagerLib Section /////////////////////////////////////// +FiberManagerLib::FiberManagerLib() : + m_fiber_mgr{folly::fibers::LocalType< IOFiberFollyImpl* >(), std::make_unique< ReactorLoopController >()} {} + +std::unique_ptr< IOFiber > FiberManagerLib::create_iofiber(IOReactor* reactor, uint32_t fiber_ordinal) { + return std::make_unique< IOFiberFollyImpl >(reactor, fiber_ordinal); +} + +void FiberManagerLib::start_iofiber(IOFiber* f, const std::function< void(IOFiber*) >& channel_loop) { + m_fiber_mgr.addTask([this, f, channel_loop]() { + set_this_iofiber(f); + channel_loop(f); + }); +} + +void FiberManagerLib::set_this_iofiber(IOFiber* f) { + auto* fiber = r_cast< IOFiberFollyImpl* >(f); + folly::fibers::local< IOFiberFollyImpl* >() = fiber; +} + +IOFiber* FiberManagerLib::iofiber_self() const { return folly::fibers::local< IOFiberFollyImpl* >(); }; + +uint32_t FiberManagerLib::iofiber_self_ordinal() const { + auto f = iofiber_self(); + return f ? f->ordinal : 0; +} + +void FiberManagerLib::yield() { m_fiber_mgr.yield(); } + +void FiberManagerLib::yield_main() { m_fiber_mgr.loopUntilNoReadyImpl(); } + +/////////////////////////////////////// ReactorLoopController Section /////////////////////////////////////// +ReactorLoopController::ReactorLoopController() : + m_tm_mgr(std::make_unique< SimpleTimeoutManager >(*this)), + m_wheel_timer(folly::HHWheelTimer::newTimer(m_tm_mgr.get())) {} + +void ReactorLoopController::runLoop() { + do { + if (m_remote_run_count < m_remote_schedule_count) { + for (; m_remote_schedule_count < m_remote_schedule_count; ++m_remote_run_count) { + if (m_fm->shouldRunLoopRemote()) { m_fm->loopUntilNoReadyImpl(); } + } + } else { + m_fm->loopUntilNoReadyImpl(); + } + } while (m_remote_run_count < m_remote_schedule_count); +} + +void ReactorLoopController::schedule() {} + +void ReactorLoopController::scheduleThreadSafe() { ++m_remote_schedule_count; } + +void ReactorLoopController::runEagerFiber(folly::fibers::Fiber* f) { m_fm->runEagerFiberImpl(f); } + +folly::HHWheelTimer* ReactorLoopController::timer() { return m_wheel_timer.get(); } + +////////////////////////// Used the code from folly to implement a simple version of TimeoutManager +/** + * A simple version of TimeoutManager that maintains only a single AsyncTimeout + * object that is used by HHWheelTimer in SimpleLoopController. + */ +class ReactorLoopController::SimpleTimeoutManager : public folly::TimeoutManager { +public: + explicit SimpleTimeoutManager(ReactorLoopController& lc) : m_rlc(lc) {} + + void attachTimeoutManager(folly::AsyncTimeout*, folly::TimeoutManager::InternalEnum) final {} + void detachTimeoutManager(folly::AsyncTimeout*) final {} + + bool scheduleTimeout(folly::AsyncTimeout* obj, timeout_type timeout) final { + // Make sure that we don't try to use this manager with two timeouts. + // CHECK(!timeout_ || timeout_->first == obj); + m_timeout.emplace(obj, std::chrono::steady_clock::now() + timeout); + return true; + } + + void cancelTimeout(folly::AsyncTimeout* obj) final { + // CHECK(timeout_ && timeout_->first == obj); + m_timeout.reset(); + } + + void bumpHandlingTime() final {} + + // bool isInTimeoutManagerThread() final { return m_rlc.isInLoopThread(); } + bool isInTimeoutManagerThread() final { return false; } + + void runTimeouts() { + std::chrono::steady_clock::time_point tp = std::chrono::steady_clock::now(); + if (!m_timeout || tp < m_timeout->second) { return; } + + auto* timeout = m_timeout->first; + m_timeout.reset(); + timeout->timeoutExpired(); + } + +private: + ReactorLoopController& m_rlc; + folly::Optional< std::pair< folly::AsyncTimeout*, std::chrono::steady_clock::time_point > > m_timeout; +}; + +/////////////////////////////////////// IOFiberFollyImpl Section /////////////////////////////////////// +IOFiberFollyImpl::IOFiberFollyImpl(IOReactor* r, uint32_t ordinal) : IOFiber{r, ordinal} {} + +bool IOFiberFollyImpl::push_msg(iomgr_msg* msg) { + channel.push(msg); + channel_baton.post(); + return true; +} + +iomgr_msg* IOFiberFollyImpl::pop_msg() { + iomgr_msg* msg{nullptr}; + channel_baton.wait(); + // LOGMSG_ASSERT_NE(channel.empty(), true, "Fiber channel baton wokenup but no msg in queue"); + if (!channel.empty()) { + msg = channel.front(); + channel.pop(); + } + return msg; +} + +void IOFiberFollyImpl::close_channel() { + std::queue< iomgr_msg* > empty; + std::swap(channel, empty); +} +} // namespace iomgr +#endif diff --git a/src/lib/reactor/reactor.cpp b/src/lib/reactor/reactor.cpp index 4b03bb7..df470fb 100644 --- a/src/lib/reactor/reactor.cpp +++ b/src/lib/reactor/reactor.cpp @@ -60,11 +60,11 @@ void IOReactor::run(int worker_slot_num, loop_type_t ltype, uint32_t num_fibers, m_reactor_num = sisl::ThreadLocalContext::my_thread_num(); m_reactor_name = name.empty() ? fmt::format("{}-{}", m_reactor_num, loop_type()) : name; - REACTOR_LOG(INFO, , , "IOReactor {} started of loop type={} and assigned reactor id {}", m_reactor_name, + REACTOR_LOG(INFO, "IOReactor {} started of loop type={} and assigned reactor id {}", m_reactor_name, loop_type(), m_reactor_num); init(num_fibers); - if (m_keep_running) { REACTOR_LOG(INFO, , , "IOReactor is ready to go to listen loop"); } + if (m_keep_running) { REACTOR_LOG(INFO, "IOReactor is ready to go to listen loop"); } } if (!m_user_controlled_loop && m_keep_running) { @@ -100,10 +100,10 @@ void IOReactor::init(uint32_t num_fibers) { iface->on_reactor_start(this); ++added_iface; } else { - REACTOR_LOG(INFO, , , "{} with scope={} ignored to add", iface->name(), iface->scope()); + REACTOR_LOG(INFO, "{} with scope={} ignored to add", iface->name(), iface->scope()); } }); - REACTOR_LOG(INFO, , , "Reactor added {} interfaces", added_iface); + REACTOR_LOG(INFO, "Reactor added {} interfaces", added_iface); m_rand_fiber_dist = std::uniform_int_distribution< size_t >(0, m_io_fibers.size() - 1); m_rand_sync_fiber_dist = std::uniform_int_distribution< size_t >(1, m_io_fibers.size() - 1); @@ -151,10 +151,10 @@ void IOReactor::stop() { iface->on_reactor_stop(this); ++removed_iface; } else { - REACTOR_LOG(INFO, , , "{} with scope={} ignored to remove", iface->name(), iface->scope()); + REACTOR_LOG(INFO, "{} with scope={} ignored to remove", iface->name(), iface->scope()); } }); - REACTOR_LOG(INFO, , , "Reactor stop removed {} interfaces", removed_iface); + REACTOR_LOG(INFO, "Reactor stop removed {} interfaces", removed_iface); for (size_t i{1}; i < m_io_fibers.size(); ++i) { auto msg = iomgr_msg::create([]() {}); // Send empty message for loop to come out and yield @@ -197,7 +197,7 @@ void IOReactor::fiber_loop(IOFiber* fiber) { iomgr_msg* msg; while (true) { if ((msg = fiber->pop_msg()) != nullptr) { - REACTOR_LOG(DEBUG, , , "Fiber {} picked the msg and handling it", fiber->ordinal); + REACTOR_LOG(DEBUG, "Fiber {} picked the msg and handling it", fiber->ordinal); handle_msg(msg); } diff --git a/src/lib/reactor/reactor.hpp b/src/lib/reactor/reactor.hpp index a7e9200..6f7778b 100644 --- a/src/lib/reactor/reactor.hpp +++ b/src/lib/reactor/reactor.hpp @@ -30,16 +30,14 @@ struct spdk_nvmf_qpair; struct spdk_bdev; namespace iomgr { -#define REACTOR_LOG(level, mod, thr_addr, __l, ...) \ +#define REACTOR_LOG(level, __l, ...) \ { \ LOG##level##MOD_FMT( \ - BOOST_PP_IF(BOOST_PP_IS_EMPTY(mod), base, mod), \ - ([&](fmt::memory_buffer& buf, const char* __m, auto&&... args) -> bool { \ + iomgr, ([&](fmt::memory_buffer& buf, const char* __m, auto&&... args) -> bool { \ fmt::vformat_to(fmt::appender(buf), fmt::string_view{"[{}:{}] "}, \ fmt::make_format_args(file_name(__FILE__), __LINE__)); \ - fmt::vformat_to( \ - fmt::appender(buf), fmt::string_view{"[IOThread {}.{}] "}, \ - fmt::make_format_args(m_reactor_num, (BOOST_PP_IF(BOOST_PP_IS_EMPTY(thr_addr), "*", thr_addr)))); \ + fmt::vformat_to(fmt::appender(buf), fmt::string_view{"[IOThread {}.{}] "}, \ + fmt::make_format_args(m_reactor_num, m_fiber_mgr_lib->iofiber_self_ordinal())); \ fmt::vformat_to(fmt::appender(buf), fmt::string_view{__m}, fmt::make_format_args(args...)); \ return true; \ }), \ diff --git a/src/lib/spdk/reactor_spdk.cpp b/src/lib/spdk/reactor_spdk.cpp index 23a6fb0..9c82422 100644 --- a/src/lib/spdk/reactor_spdk.cpp +++ b/src/lib/spdk/reactor_spdk.cpp @@ -127,7 +127,7 @@ void IOReactorSPDK::stop_impl() { void IOReactorSPDK::add_external_spdk_thread(struct spdk_thread* sthread) { m_external_spdk_threads.push_back(sthread); - REACTOR_LOG(INFO, , , "Added External SPDK Thread {} to this reactor", spdk_thread_get_name(sthread)); + REACTOR_LOG(INFO, "Added External SPDK Thread {} to this reactor", spdk_thread_get_name(sthread)); } int IOReactorSPDK::add_iodev_impl(const io_device_ptr& iodev) {