Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide IOManagerExecutor for use with folly::Futures library. #52

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class IOMgrConan(ConanFile):
'shared': False,
'fPIC': True,
'coverage': False,
'grpc_support': True,
'grpc_support': False,
'sanitize': False,
'spdk': True,
'testing': 'epoll_mode',
Expand Down
4 changes: 2 additions & 2 deletions prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ echo -n "dpdk."
conan export 3rd_party/dpdk
echo -n "fio."
conan export 3rd_party/fio
echo -n "gprc_internal."
conan export 3rd_party/grpc_internal
#echo -n "gprc_internal."
#conan export 3rd_party/grpc_internal
echo -n "spdk."
conan export 3rd_party/spdk

Expand Down
25 changes: 25 additions & 0 deletions src/include/iomgr/iomgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <semver200.h>
#include <boost/fiber/all.hpp>
#include <folly/Executor.h>
#include <sisl/fds/bitword.hpp>
#include <sisl/fds/buffer.hpp>
#include <sisl/fds/id_reserver.hpp>
Expand All @@ -44,6 +45,25 @@
#include <iomgr/io_device.hpp>
#include <iomgr/fiber_lib.hpp>

namespace folly {

/**
* @class folly::IOManagerExecutor
*
* A folly::Executor for which to call `.via()` with when returning a folly::SemiFuture
* for runtime erasure.
*
**/
class IOManagerExecutor : public Executor {
iomgr::io_fiber_t _fiber;

public:
IOManagerExecutor(iomgr::io_fiber_t fiber) : _fiber(fiber) {}
void add(Func fn) override;
};

} // namespace folly

namespace iomgr {

struct timer_info;
Expand Down Expand Up @@ -99,6 +119,8 @@ class IOManager {
friend class IOManagerSpdkImpl;
friend class IOManagerEpollImpl;

friend void folly::IOManagerExecutor::add(folly::Func);

static IOManager& instance() {
static IOManager inst;
return inst;
Expand Down Expand Up @@ -253,6 +275,8 @@ class IOManager {
}
}

folly::IOManagerExecutor* fiberExecutor(io_fiber_t fiber);

///////////////////////////// Access related methods /////////////////////////////
GenericIOInterface* generic_interface() { return m_default_general_iface.get(); }
GrpcInterface* grpc_interface() { return m_default_grpc_iface.get(); }
Expand Down Expand Up @@ -371,6 +395,7 @@ class IOManager {

std::shared_mutex m_iface_list_mtx;
std::vector< std::shared_ptr< IOInterface > > m_iface_list;
std::map< io_fiber_t, folly::IOManagerExecutor > m_fiber_executors;
std::vector< std::shared_ptr< DriveInterface > > m_drive_ifaces;

std::shared_ptr< GenericIOInterface > m_default_general_iface;
Expand Down
20 changes: 20 additions & 0 deletions src/lib/iomgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ void IOManager::stop() {
// m_default_grpc_iface.reset();
m_drive_ifaces.clear();
m_iface_list.clear();
m_fiber_executors.clear();
} catch (const std::exception& e) { LOGCRITICAL_AND_FLUSH("Caught exception {} during clear lists", e.what()); }
assert(get_state() == iomgr_state::stopped);

Expand Down Expand Up @@ -314,6 +315,11 @@ void IOManager::foreach_interface(const interface_cb_t& iface_cb) {
}
}

folly::IOManagerExecutor* IOManager::fiberExecutor(io_fiber_t fiber) {
std::shared_lock lg(m_iface_list_mtx);
return &m_fiber_executors.at(fiber);
}

void IOManager::_run_io_loop(int iomgr_slot_num, loop_type_t loop_type, uint32_t num_fibers, const std::string& name,
const iodev_selector_t& iodev_selector, thread_state_notifier_t&& addln_notifier) {
loop_type_t ltype = loop_type;
Expand All @@ -336,6 +342,11 @@ void IOManager::reactor_started(shared< IOReactor > reactor) {
m_yet_to_stop_nreactors.increment();
if (reactor->is_worker()) {
m_worker_reactors[reactor->m_worker_slot_num] = reactor;
for (auto const& fiber : reactor->sync_io_capable_fibers()) {
auto lg = std::scoped_lock(m_iface_list_mtx);
auto [_, happened] = m_fiber_executors.emplace(fiber, folly::IOManagerExecutor(fiber));
RELEASE_ASSERT(happened, "Failed to Bind folly::Executor!");
}
reactor->notify_thread_state(true);

// All iomgr created reactors are initialized, move iomgr to sys init (next phase of start)
Expand Down Expand Up @@ -374,6 +385,15 @@ int IOManager::run_on_forget(io_fiber_t fiber, spdk_msg_signature_t fn, void* co
return 1;
}

} // namespace iomgr

namespace folly {
void IOManagerExecutor::add(Func fn) {
iomanager.send_msg(_fiber, iomgr::iomgr_msg::create(std::move(fn).asStdFunction()));
}
} // namespace folly

namespace iomgr {
int IOManager::send_msg(io_fiber_t fiber, iomgr_msg* msg) {
int ret{0};
if (fiber->spdk_thr) {
Expand Down
46 changes: 18 additions & 28 deletions src/test/test_fiber_shared_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ class SharedMutexTest : public testing::Test {
iomgr::FiberManagerLib::shared_mutex m_cb_mtx;
std::vector< iomgr::io_fiber_t > m_fibers;
uint64_t m_count_per_fiber{0};
std::mutex m_test_done_mtx;
std::condition_variable m_test_done_cv;
uint32_t m_test_count{0};
std::atomic_uint32_t m_test_count{0};

protected:
void SetUp() override {
Expand Down Expand Up @@ -88,10 +86,7 @@ class SharedMutexTest : public testing::Test {
}

LOGINFO("Fiber completed {} of exclusive locks", m_count_per_fiber);
{
std::unique_lock lg(m_test_done_mtx);
if (--m_test_count == 0) { m_test_done_cv.notify_one(); }
}
--m_test_count;
}

void all_reader() {
Expand All @@ -100,10 +95,7 @@ class SharedMutexTest : public testing::Test {
}

LOGINFO("Fiber completed {} of shared locks", m_count_per_fiber);
{
std::unique_lock lg(m_test_done_mtx);
if (--m_test_count == 0) { m_test_done_cv.notify_one(); }
}
--m_test_count;
}

void random_reader_writer() {
Expand All @@ -124,10 +116,7 @@ class SharedMutexTest : public testing::Test {
}

LOGINFO("Fiber completed shared_locks={} exclusive_locks={}", read_count, write_count);
{
std::unique_lock lg(m_test_done_mtx);
if (--m_test_count == 0) { m_test_done_cv.notify_one(); }
}
--m_test_count;
}

void write_once() {
Expand All @@ -145,26 +134,27 @@ class SharedMutexTest : public testing::Test {
};

TEST_F(SharedMutexTest, single_writer_multiple_readers) {
iomanager.run_on_forget(m_fibers[0], [this]() { all_writer(); });
auto e = folly::QueuedImmediateExecutor();
auto calls = std::vector< folly::SemiFuture< folly::Unit > >();
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(m_fibers[0])).thenValue([this](auto) { all_writer(); }));
for (auto it = m_fibers.begin() + 1; it < m_fibers.end(); ++it) {
iomanager.run_on_forget(*it, [this]() { all_reader(); });
}

{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return m_test_count == 0; });
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(*it)).thenValue([this](auto) { all_reader(); }));
}
folly::collectAll(calls).via(&e).get();
EXPECT_EQ(0, m_test_count.load());
}

TEST_F(SharedMutexTest, random_reader_writers) {
auto e = folly::QueuedImmediateExecutor();
auto calls = std::vector< folly::SemiFuture< folly::Unit > >();
for (const auto& f : m_fibers) {
iomanager.run_on_forget(f, [this]() { random_reader_writer(); });
}

{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return m_test_count == 0; });
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(f)).thenValue([this](auto) { all_reader(); }));
}
folly::collectAll(calls).via(&e).get();
EXPECT_EQ(0, m_test_count.load());
}

int main(int argc, char* argv[]) {
Expand Down