From bb808e4da4b35e95bc2a633de6029d0e4934af05 Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Thu, 23 Jan 2025 14:19:15 +0800 Subject: [PATCH 01/13] [coro_io] add support for cancellation --- include/ylt/coro_io/coro_io.hpp | 407 +++++++++++------- include/ylt/coro_io/io_context_pool.hpp | 14 +- include/ylt/thirdparty/async_simple/Signal.h | 11 +- .../thirdparty/async_simple/coro/Collect.h | 87 ++-- .../thirdparty/async_simple/coro/CountEvent.h | 16 +- src/coro_io/tests/CMakeLists.txt | 1 + src/coro_io/tests/test_cancel.cpp | 184 ++++++++ src/coro_io/tests/test_client_pool.cpp | 1 + .../examples/base_examples/client_pool.cpp | 4 +- .../base_examples/concurrent_clients.cpp | 4 +- 10 files changed, 500 insertions(+), 229 deletions(-) create mode 100644 src/coro_io/tests/test_cancel.cpp diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index e26fab75c6..aabbd3c614 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -21,6 +21,22 @@ #include #include +#include +#include +#include +#include +#include +#include +#include + +#include "asio/dispatch.hpp" +#include "asio/io_context.hpp" +#include "async_simple/Signal.h" +#include "async_simple/coro/FutureAwaiter.h" +#include "async_simple/coro/SpinLock.h" +#include "ylt/easylog.hpp" +#include "ylt/util/type_traits.h" + #if defined(YLT_ENABLE_SSL) || defined(CINATRA_ENABLE_SSL) #include #endif @@ -100,6 +116,14 @@ class callback_awaitor_base { obj->arg_ = {std::forward(args)...}; } } + void set_value(std::error_code &&ec) const { + if constexpr (std::is_same_v) { + obj->arg_ = std::move(ec); + } + else { + std::get<0>(obj->arg_) = std::move(ec); + } + } void resume() const { obj->coro_.resume(); } private: @@ -127,134 +151,236 @@ template <> class callback_awaitor : public callback_awaitor_base> {}; -inline async_simple::coro::Lazy async_accept( - asio::ip::tcp::acceptor &acceptor, asio::ip::tcp::socket &socket) noexcept { - callback_awaitor awaitor; +template +struct post_helper { + void operator()(auto handler) { + asio::post(e, [this, handler]() { + try { + if constexpr (std::is_same_v>) { + func(); + handler.resume(); + } + else { + auto r = func(); + handler.set_value_then_resume(std::move(r)); + } + } catch (const std::exception &e) { + R er; + er.setException(std::current_exception()); + handler.set_value_then_resume(std::move(er)); + } + }); + } + Executor e; + Func func; +}; - co_return co_await awaitor.await_resume([&](auto handler) { - acceptor.async_accept(socket, [&, handler](const auto &ec) mutable { - handler.set_value_then_resume(ec); +template +inline async_simple::coro::Lazy< + async_simple::Try::return_type>> +post(Func func, Executor executor) { + using R = + async_simple::Try::return_type>; + + callback_awaitor awaitor; + post_helper helper{executor, std::move(func)}; + co_return co_await awaitor.await_resume(helper); +} + +template +inline async_simple::coro::Lazy< + async_simple::Try::return_type>> +post(Func func, + coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) { + return post(std::move(func), e->get_asio_executor()); +} + +template +inline async_simple::coro::Lazy async_io(IO_func io_func, + io_object &obj) noexcept { + callback_awaitor awaitor; + auto slot = co_await async_simple::coro::CurrentSlot{}; + if (!slot) { + co_return co_await awaitor.await_resume([&](auto handler) { + io_func([&, handler](auto &&...args) mutable { + handler.set_value(std::forward(args)...); + handler.resume(); + }); }); - }); + } + else { + auto executor = obj.get_executor(); + auto lock = std::make_shared>(); + bool hasCanceled; + auto result = co_await awaitor.await_resume( + [&, &lock_ref = lock](auto handler) mutable { + auto lock = lock_ref; + hasCanceled = !slot->emplace( + async_simple::SignalType::Terminate, + [&obj, weak_lock = std::weak_ptr{lock}, executor = executor]( + async_simple::SignalType signalType, + async_simple::Signal *signal) { + asio::dispatch( + executor, [&obj, weak_lock = std::move(weak_lock)]() { + if (auto ptr = weak_lock.lock(); ptr) { + bool expected = false; + if (!ptr->compare_exchange_strong( + expected, true, std::memory_order_release)) { + obj.cancel(); + } + } + }); + }); + if (hasCanceled) { + asio::dispatch(executor, [handler]() { + handler.set_value( + std::make_error_code(std::errc::operation_canceled)); + handler.resume(); + }); + } + else { + io_func([&, handler](auto &&...args) mutable { + slot->clear(async_simple::Terminate); + handler.set_value(std::forward(args)...); + handler.resume(); + }); + bool expected = false; + if (!lock->compare_exchange_strong(expected, true, + std::memory_order_release)) { + obj.cancel(); + } + lock = nullptr; + } + }); + if (!hasCanceled) { + auto weak_lock = std::weak_ptr{lock}; + lock = nullptr; + // wait cancel finish to make sure io object's life-time + for (; weak_lock.lock();) { + std::cout << "SHIT" << std::endl; + co_await coro_io::post( + []() { + }, + executor); + } + } + co_return result; + } +} + +inline async_simple::coro::Lazy async_accept( + asio::ip::tcp::acceptor &acceptor, asio::ip::tcp::socket &socket) noexcept { + return async_io( + [&](auto &&cb) { + acceptor.async_accept(socket, std::move(cb)); + }, + acceptor); } template inline async_simple::coro::Lazy> async_read_some(Socket &socket, AsioBuffer &&buffer) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - socket.async_read_some(buffer, [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&](auto &&cb) { + socket.async_read_some(buffer, std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy> async_read_at(uint64_t offset, Socket &socket, AsioBuffer &&buffer) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - asio::async_read_at(socket, offset, buffer, - [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&](auto &&cb) { + asio::async_read_at(socket, offset, buffer, std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy> async_read( Socket &socket, AsioBuffer &&buffer) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - asio::async_read(socket, buffer, [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&](auto &&cb) { + asio::async_read(socket, buffer, std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy> async_read( Socket &socket, AsioBuffer &buffer, size_t size_to_read) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - asio::async_read(socket, buffer, asio::transfer_exactly(size_to_read), - [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&](auto &&cb) { + asio::async_read(socket, buffer, asio::transfer_exactly(size_to_read), + std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy> async_read_until(Socket &socket, AsioBuffer &buffer, asio::string_view delim) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - asio::async_read_until(socket, buffer, delim, - [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&, delim](auto &&cb) { + asio::async_read_until(socket, buffer, delim, std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy> async_write( Socket &socket, AsioBuffer &&buffer) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - asio::async_write(socket, buffer, [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&](auto &&cb) { + asio::async_write(socket, buffer, std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy> async_write_some(Socket &socket, AsioBuffer &&buffer) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - socket.async_write_some(buffer, [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&](auto &&cb) { + socket.async_write_some(buffer, std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy> async_write_at(uint64_t offset, Socket &socket, AsioBuffer &&buffer) noexcept { - callback_awaitor> awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - asio::async_write_at(socket, offset, buffer, - [&, handler](const auto &ec, auto size) { - handler.set_value_then_resume(ec, size); - }); - }); + return async_io>( + [&, offset](auto &&cb) { + asio::async_write_at(socket, offset, buffer, std::move(cb)); + }, + socket); } template inline async_simple::coro::Lazy async_connect( executor_t *executor, asio::ip::tcp::socket &socket, const std::string &host, const std::string &port) noexcept { - callback_awaitor awaitor; asio::ip::tcp::resolver resolver(executor->get_asio_executor()); - asio::ip::tcp::resolver::iterator iterator; - auto ec = co_await awaitor.await_resume([&](auto handler) { - resolver.async_resolve(host, port, [&, handler](auto ec, auto it) { - iterator = it; - handler.set_value_then_resume(ec); - }); - }); - - if (ec) { - co_return ec; + auto result = co_await async_io< + std::pair>( + [&](auto &&cb) { + resolver.async_resolve(host, port, std::move(cb)); + }, + resolver); + + if (result.first) { + co_return result.first; } - - co_return co_await awaitor.await_resume([&](auto handler) { - asio::async_connect(socket, iterator, - [&, handler](const auto &ec, const auto &) mutable { - handler.set_value_then_resume(ec); - }); - }); + result = co_await async_io< + std::pair>( + [&](auto &&cb) { + asio::async_connect(socket, result.second, std::move(cb)); + }, + socket); + co_return result.first; } template @@ -262,7 +388,7 @@ inline async_simple::coro::Lazy async_close(Socket &socket) noexcept { callback_awaitor awaitor; auto executor = socket.get_executor(); co_return co_await awaitor.await_resume([&](auto handler) { - asio::post(executor, [&, handler]() { + asio::dispatch(executor, [&, handler]() { asio::error_code ignored_ec; socket.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); socket.close(ignored_ec); @@ -274,12 +400,11 @@ inline async_simple::coro::Lazy async_close(Socket &socket) noexcept { #if defined(YLT_ENABLE_SSL) || defined(CINATRA_ENABLE_SSL) inline async_simple::coro::Lazy async_handshake( auto &ssl_stream, asio::ssl::stream_base::handshake_type type) noexcept { - callback_awaitor awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - ssl_stream->async_handshake(type, [&, handler](const auto &ec) { - handler.set_value_then_resume(ec); - }); - }); + return async_io( + [&, type](auto &&cb) { + ssl_stream->async_handshake(type, std::move(cb)); + }, + ssl_stream); } #endif class period_timer : public asio::steady_timer { @@ -290,28 +415,31 @@ class period_timer : public asio::steady_timer { : asio::steady_timer(executor->get_asio_executor()) {} async_simple::coro::Lazy async_await() noexcept { - callback_awaitor awaitor; - - co_return co_await awaitor.await_resume([&](auto handler) { - this->async_wait([&, handler](const auto &ec) { - handler.set_value_then_resume(!ec); - }); - }); + auto ec = co_await async_io( + [&](auto &&cb) { + this->async_wait(std::move(cb)); + }, + *this); + co_return !ec; } }; template -inline async_simple::coro::Lazy sleep_for(const Duration &d, - Executor *e) { +inline async_simple::coro::Lazy sleep_for(Duration d, Executor *e) { coro_io::period_timer timer(e); timer.expires_after(d); - co_await timer.async_await(); + co_return co_await timer.async_await(); } template -inline async_simple::coro::Lazy sleep_for(Duration d) { +inline async_simple::coro::Lazy sleep_for(Duration d) { if (auto executor = co_await async_simple::CurrentExecutor(); executor != nullptr) { - co_await async_simple::coro::sleep(d); + try { + co_await async_simple::coro::sleep(d); + } catch (const async_simple::SignalException &e) { + co_return false; + } + co_return true; } else { co_return co_await sleep_for(d, @@ -319,55 +447,12 @@ inline async_simple::coro::Lazy sleep_for(Duration d) { } } -template -struct post_helper { - void operator()(auto handler) { - asio::post(e, [this, handler]() { - try { - if constexpr (std::is_same_v>) { - func(); - handler.resume(); - } - else { - auto r = func(); - handler.set_value_then_resume(std::move(r)); - } - } catch (const std::exception &e) { - R er; - er.setException(std::current_exception()); - handler.set_value_then_resume(std::move(er)); - } - }); - } - Executor e; - Func func; -}; - -template -inline async_simple::coro::Lazy< - async_simple::Try::return_type>> -post(Func func, Executor executor) { - using R = - async_simple::Try::return_type>; - - callback_awaitor awaitor; - post_helper helper{executor, std::move(func)}; - co_return co_await awaitor.await_resume(helper); -} - -template -inline async_simple::coro::Lazy< - async_simple::Try::return_type>> -post(Func func, - coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) { - co_return co_await post(std::move(func), e->get_asio_executor()); -} - template struct channel : public asio::experimental::channel { using return_type = R; - using ValueType = std::pair; - using asio::experimental::channel::channel; + using value_type = std::pair; + using base_type = asio::experimental::channel; + using base_type::base_type; channel(coro_io::ExecutorWrapper<> *executor, size_t capacity) : executor_(executor), asio::experimental::channel( @@ -399,13 +484,11 @@ inline async_simple::coro::Lazy async_send( if (r) { co_return std::error_code{}; } - callback_awaitor awaitor; - co_return co_await awaitor.await_resume( - [&, val = std::move(val)](auto handler) { - channel.async_send({}, std::move(val), [handler](const auto &ec) { - handler.set_value_then_resume(ec); - }); - }); + co_return co_await async_io( + [&](auto &&cb) { + channel.async_send({}, std::move(val), std::move(cb)); + }, + channel); } template @@ -420,13 +503,12 @@ async_simple::coro::Lazy> - awaitor; - co_return co_await awaitor.await_resume([&](auto handler) { - channel.async_receive([handler](auto ec, auto val) { - handler.set_value_then_resume(std::make_pair(ec, std::move(val))); - }); - }); + co_return co_await async_io< + std::pair>( + [&](auto &&cb) { + channel.async_receive(std::move(cb)); + }, + *(typename Channel::base_type *)&channel); } template @@ -509,14 +591,13 @@ async_sendfile(asio::ip::tcp::socket &socket, int fd, off_t offset, // Check if we need to run the operation again. if (ec == asio::error::would_block || ec == asio::error::try_again) [[unlikely]] { - callback_awaitor non_block_awaitor; // We have to wait for the socket to become ready again. - ec = co_await non_block_awaitor.await_resume([&](auto handler) { - socket.async_wait(asio::ip::tcp::socket::wait_write, - [handler](const auto &ec) { - handler.set_value_then_resume(ec); - }); - }); + ec = co_await async_io( + [&](auto &&cb) { + socket.async_wait(asio::ip::tcp::socket::wait_write, + std::move(cb)); + }, + socket); } if (ec || n == 0 || least_bytes == 0) [[unlikely]] { // End of File break; diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 0e725f9eb2..80862a9050 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -31,6 +31,7 @@ #include #include "asio/dispatch.hpp" +#include "async_simple/Signal.h" #ifdef __linux__ #include #include @@ -104,11 +105,18 @@ class ExecutorWrapper : public async_simple::Executor { } void schedule(Func func, Duration dur, uint64_t hint, async_simple::Slot *slot = nullptr) override { - auto timer = std::make_unique(executor_, dur); - auto tm = timer.get(); - tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) { + auto timer = std::make_shared(executor_, dur); + timer->async_wait([fn = std::move(func), timer](const auto &ec) { fn(); }); + if (!async_simple::signalHelper{async_simple::SignalType::Terminate} + .tryEmplace(slot, [timer](auto signalType, auto *signal) mutable { + asio::dispatch(timer->get_executor(), [timer]() { + timer->cancel(); + }); + })) { + timer->cancel(); + } } }; diff --git a/include/ylt/thirdparty/async_simple/Signal.h b/include/ylt/thirdparty/async_simple/Signal.h index 8f5dfd302f..055a60a87b 100644 --- a/include/ylt/thirdparty/async_simple/Signal.h +++ b/include/ylt/thirdparty/async_simple/Signal.h @@ -17,7 +17,6 @@ #define ASYNC_SIMPLE_SIGNAL_H #ifndef ASYNC_SIMPLE_USE_MODULES - #include #include #include @@ -205,14 +204,16 @@ class Slot { "we dont allow emplace an empty signal handler"); logicAssert(std::popcount(static_cast(type)) == 1, "It's not allow to emplace for multiple signals"); - // trigger-once signal has already been triggered + auto handler = std::make_unique( + std::forward(args)...); + auto oldHandlerPtr = loadHandler(type); + // check trigger-once signal has already been triggered + // if signal has already been triggered, return false if (!detail::SignalSlotSharedState::isMultiTriggerSignal(type) && (signal()->state() & type)) { return false; } - auto handler = std::make_unique( - std::forward(args)...); - auto oldHandlerPtr = loadHandler(type); + // if signal triggered later, we will found it by cas failed. auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire); if (oldHandler == &detail::SignalSlotSharedState::HandlerManager::emittedTag) { diff --git a/include/ylt/thirdparty/async_simple/coro/Collect.h b/include/ylt/thirdparty/async_simple/coro/Collect.h index cb224556a0..447acb2c8f 100644 --- a/include/ylt/thirdparty/async_simple/coro/Collect.h +++ b/include/ylt/thirdparty/async_simple/coro/Collect.h @@ -166,8 +166,8 @@ struct CollectAnyAwaiter { _slot, [c = continuation, e = event, size = input.size()]( SignalType type, Signal*) mutable { auto count = e->downCount(); - if (count > size + 1) { - c.resume(); + if (count == size + 1) { + c.resume(); } })) { // has canceled return false; @@ -186,14 +186,14 @@ struct CollectAnyAwaiter { assert(e != nullptr); auto count = e->downCount(); // n+1: n coro + 1 cancel handler - if (count > size + 1) { - _result = std::make_unique(); - _result->_idx = i; - _result->_value = std::move(result); - if (auto ptr = local->getSlot(); ptr) { - ptr->signal()->emit(_SignalType); - } - c.resume(); + if (count == size + 1) { + _result = std::make_unique(); + _result->_idx = i; + _result->_value = std::move(result); + if (auto ptr = local->getSlot(); ptr) { + ptr->signal()->emit(_SignalType); + } + c.resume(); } }); } // end for @@ -268,8 +268,8 @@ struct CollectAnyVariadicAwaiter { _slot, [c = continuation, e = event](SignalType type, Signal*) mutable { auto count = e->downCount(); - if (count > std::tuple_size() + 1) { - c.resume(); + if (count == std::tuple_size() + 1) { + c.resume(); } })) { // has canceled return false; @@ -290,13 +290,13 @@ struct CollectAnyVariadicAwaiter { res) mutable { auto count = e->downCount(); // n+1: n coro + 1 cancel handler - if (count > std::tuple_size() + 1) { - _result = std::make_unique( - std::in_place_index_t(), std::move(res)); - if (auto ptr = local->getSlot(); ptr) { - ptr->signal()->emit(_SignalType); - } - c.resume(); + if (count == std::tuple_size() + 1) { + _result = std::make_unique( + std::in_place_index_t(), std::move(res)); + if (auto ptr = local->getSlot(); ptr) { + ptr->signal()->emit(_SignalType); + } + c.resume(); } }); }(), @@ -388,15 +388,19 @@ struct CollectAllAwaiter { _slot->chainedSignal(_signal.get()); auto executor = promise_type._executor; - for (size_t i = 0; i < _input.size(); ++i) { - auto& exec = _input[i]._coro.promise()._executor; - if (exec == nullptr) { - exec = executor; - } - std::unique_ptr local; - local = std::make_unique(_signal.get()); - _input[i]._coro.promise()._lazy_local = local.get(); - auto&& func = [this, i, local = std::move(local)]() mutable { + + _event.setAwaitingCoro(continuation); + auto size = _input.size(); + for (size_t i = 0; i < size; ++i) { + auto& exec = _input[i]._coro.promise()._executor; + if (exec == nullptr) { + exec = executor; + } + std::unique_ptr local; + local = std::make_unique(_signal.get()); + _input[i]._coro.promise()._lazy_local = local.get(); + auto&& func = + [this, i, local = std::move(local)]() mutable { _input[i].start([this, i, local = std::move(local)]( Try&& result) { _output[i] = std::move(result); @@ -412,20 +416,15 @@ struct CollectAllAwaiter { awaitingCoro.resume(); } }); - }; - if (Para == true && _input.size() > 1) { - if (exec != nullptr) - AS_LIKELY { - exec->schedule_move_only(std::move(func)); - continue; - } - } - func(); - } - _event.setAwaitingCoro(continuation); - auto awaitingCoro = _event.down(); - if (awaitingCoro) { - awaitingCoro.resume(); + }; + if (Para == true && _input.size() > 1) { + if (exec != nullptr) + AS_LIKELY { + exec->schedule_move_only(std::move(func)); + continue; + } + } + func(); } } inline auto await_resume() { return std::move(_output); } @@ -602,10 +601,6 @@ struct CollectAllVariadicAwaiter { } }(std::get(_inputs), std::get(_results)), ...); - - if (auto awaitingCoro = _event.down(); awaitingCoro) { - awaitingCoro.resume(); - } } void await_suspend(std::coroutine_handle<> continuation) { diff --git a/include/ylt/thirdparty/async_simple/coro/CountEvent.h b/include/ylt/thirdparty/async_simple/coro/CountEvent.h index ad33b37847..52366551d7 100644 --- a/include/ylt/thirdparty/async_simple/coro/CountEvent.h +++ b/include/ylt/thirdparty/async_simple/coro/CountEvent.h @@ -34,15 +34,15 @@ namespace detail { // The last 'down' will resume the awaiting coroutine on this event. class CountEvent { public: - CountEvent(size_t count) : _count(count + 1) {} - CountEvent(const CountEvent&) = delete; - CountEvent(CountEvent&& other) - : _count(other._count.exchange(0, std::memory_order_relaxed)), - _awaitingCoro(std::exchange(other._awaitingCoro, nullptr)) {} + CountEvent(size_t count) : _count(count) {} + CountEvent(const CountEvent&) = delete; + CountEvent(CountEvent&& other) + : _count(other._count.exchange(0, std::memory_order_relaxed)), + _awaitingCoro(std::exchange(other._awaitingCoro, nullptr)) {} - [[nodiscard]] CoroHandle<> down(size_t n = 1) { - std::size_t oldCount; - return down(oldCount, n); + [[nodiscard]] CoroHandle<> down(size_t n = 1) { + std::size_t oldCount; + return down(oldCount, n); } [[nodiscard]] CoroHandle<> down(size_t& oldCount, std::size_t n) { // read acquire and write release, _awaitingCoro store can not be diff --git a/src/coro_io/tests/CMakeLists.txt b/src/coro_io/tests/CMakeLists.txt index 97995dbb86..1ffd75b472 100644 --- a/src/coro_io/tests/CMakeLists.txt +++ b/src/coro_io/tests/CMakeLists.txt @@ -5,6 +5,7 @@ add_executable(coro_io_test test_client_pool.cpp test_rate_limiter.cpp test_coro_channel.cpp + test_cancel.cpp main.cpp ) if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_NAME MATCHES "Windows") # mingw-w64 diff --git a/src/coro_io/tests/test_cancel.cpp b/src/coro_io/tests/test_cancel.cpp new file mode 100644 index 0000000000..f39fd399a0 --- /dev/null +++ b/src/coro_io/tests/test_cancel.cpp @@ -0,0 +1,184 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_simple/Executor.h" +#include "async_simple/Promise.h" +#include "async_simple/Signal.h" +#include "async_simple/Unit.h" +#include "async_simple/coro/ConditionVariable.h" +#include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Semaphore.h" +#include "async_simple/coro/Sleep.h" +#include "async_simple/coro/SpinLock.h" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" +#include "ylt/coro_rpc/impl/expected.hpp" +#include "ylt/easylog.hpp" +#include "ylt/easylog/record.hpp" +using namespace std::chrono_literals; +using namespace async_simple::coro; + +Lazy my_sleep(std::chrono::milliseconds ms, auto executor, + bool if_twice) { + auto result = co_await coro_io::sleep_for(ms, executor); + if (if_twice && result) { + try { + co_await async_simple::coro::sleep(ms); + } catch (const async_simple::SignalException& e) { + result = false; + } + } + co_return result; +} +async_simple::coro::Lazy test_cancel(coro_io::io_context_pool& io_pool, + std::chrono::milliseconds ms, + coro_io::ExecutorWrapper<>* executor, + bool if_sleep_twice, + bool is_same_sec = false, + bool order = false) { + std::vector> tasks; + for (int i = 1000; i >= 0; --i) { + tasks.emplace_back(my_sleep(is_same_sec ? ms : ((i == 0) ? (0 * ms) : ms), + executor ? executor : io_pool.get_executor(), + if_sleep_twice) + .via(executor ? executor : io_pool.get_executor())); + } + if (order) { + std::vector> tasks2; + while (tasks.size()) { + tasks2.emplace_back(std::move(tasks.back())); + tasks.pop_back(); + } + tasks = std::move(tasks2); + } + auto result = co_await collectAll(std::move(tasks)); + + if (ms.count() && !is_same_sec) { + if (order) { + CHECK(result.front().value() == true); + for (int i = 1; i <= 1000; ++i) { + CHECK(result[i].value() == false); + } + } + else { + CHECK(result.back().value() == true); + result.pop_back(); + for (auto& r : result) { + CHECK(r.value() == false); + } + } + } +} + +async_simple::coro::Lazy test_cancel2(coro_io::io_context_pool& io_pool, + std::chrono::milliseconds ms, + bool is_same_sec = false, + bool order = false) { + std::vector> tasks; + for (int i = 1000; i >= 0; --i) { + tasks.emplace_back(my_sleep(is_same_sec ? ms : ((i == 0) ? (0 * ms) : ms), + io_pool.get_executor(), false)); + } + if (order) { + std::vector> tasks2; + while (tasks.size()) { + tasks2.emplace_back(std::move(tasks.back())); + tasks.pop_back(); + } + tasks = std::move(tasks2); + } + auto result = co_await collectAll(std::move(tasks)); + + if (ms.count() && !is_same_sec) { + if (order) { + CHECK(result.front().value() == true); + for (int i = 1; i <= 1000; ++i) { + CHECK(result[i].value() == false); + } + } + else { + CHECK(result.back().value() == true); + result.pop_back(); + for (auto& r : result) { + CHECK(r.value() == false); + } + } + } +} +TEST_CASE("test multithread cancellation") { + // easylog::logger<>::instance().set_min_severity(easylog::Severity::INFO); + coro_io::io_context_pool io_pool(101); + ELOG_TRACE << "test multithread cancellation"; + std::thread thd{[&]() { + ELOG_TRACE << "start thread pool"; + io_pool.run(); + }}; + auto test = [&](coro_io::ExecutorWrapper<>* e) { + for (int sleep_twice = 0; sleep_twice <= 1; ++sleep_twice) { + ELOG_TRACE << "slow work canceled(reverse) test"; + for (int i = 0; i < 100; ++i) { + async_simple::coro::syncAwait( + test_cancel(io_pool, 10000s, e, sleep_twice, false, true)); + } + ELOG_TRACE << "slow work canceled test"; + for (int i = 0; i < 100; ++i) { + async_simple::coro::syncAwait( + test_cancel(io_pool, 10000s, e, sleep_twice)); + } + ELOG_TRACE << "work finished in 0s test"; + for (int i = 0; i < 100; ++i) + async_simple::coro::syncAwait(test_cancel(io_pool, 0s, e, sleep_twice)); + ELOG_TRACE << "work finished in same time test"; + for (int i = 0; i < 20; ++i) + async_simple::coro::syncAwait( + test_cancel(io_pool, 100ms, e, sleep_twice, true)); + ELOG_TRACE << "work finished in same short time test"; + for (int i = 0; i < 100; ++i) + async_simple::coro::syncAwait( + test_cancel(io_pool, 1ms, e, sleep_twice, true)); + } + }; + test(nullptr); + test(coro_io::get_global_executor()); + + ELOG_TRACE << "slow work canceled(reverse) test"; + for (int i = 0; i < 100; ++i) + async_simple::coro::syncAwait(test_cancel2(io_pool, 10000s, false, true)); + ELOG_TRACE << "slow work canceled test"; + for (int i = 0; i < 100; ++i) + async_simple::coro::syncAwait(test_cancel2(io_pool, 10000s)); + ELOG_TRACE << "work finished in 0s test"; + for (int i = 0; i < 100; ++i) + async_simple::coro::syncAwait(test_cancel2(io_pool, 0s)); + ELOG_TRACE << "work finished in same time test"; + for (int i = 0; i < 20; ++i) + async_simple::coro::syncAwait(test_cancel2(io_pool, 100ms, true)); + ELOG_TRACE << "work finished in same short time test"; + for (int i = 0; i < 100; ++i) + async_simple::coro::syncAwait(test_cancel2(io_pool, 1ms, true)); + + ELOG_TRACE << "stop thread pool"; + io_pool.stop(); + ELOG_TRACE << "join thread pool"; + thd.join(); + ELOG_DEBUG << "test multithread cancellation over."; +} \ No newline at end of file diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp index c3f8cbd99d..71050b0232 100644 --- a/src/coro_io/tests/test_client_pool.cpp +++ b/src/coro_io/tests/test_client_pool.cpp @@ -76,6 +76,7 @@ async_simple::coro::Lazy event( works.emplace_back(backer(pool, op).via(coro_io::get_global_executor())); } auto res = co_await collectAll(std::move(works)); + std::cout << "HI" << std::endl; for (auto &e : res) { if (!e.value()) { co_return false; diff --git a/src/coro_rpc/examples/base_examples/client_pool.cpp b/src/coro_rpc/examples/base_examples/client_pool.cpp index 165f430173..dbc63bc9ad 100644 --- a/src/coro_rpc/examples/base_examples/client_pool.cpp +++ b/src/coro_rpc/examples/base_examples/client_pool.cpp @@ -42,7 +42,7 @@ std::string echo(std::string_view sv); constexpr unsigned thread_cnt = 1920; -constexpr auto request_cnt = 1000; +constexpr auto request_cnt = 100000; using namespace coro_rpc; using namespace async_simple::coro; using namespace std::string_view_literals; @@ -112,7 +112,7 @@ void latency_watcher() { } int main() { auto executor = coro_io::get_global_block_executor(); - for (int i = 0, lim = thread_cnt * 10; i < lim; ++i) { + for (int i = 0, lim = thread_cnt; i < lim; ++i) { coro_io::get_global_executor()->schedule([=]() { send().start([executor](auto&& res) { executor->schedule([res = std::move(res.value())]() mutable { diff --git a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp index bf087a6591..bda5a67f1d 100644 --- a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp +++ b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp @@ -67,7 +67,7 @@ Lazy> send(int id, int cnt) { ELOG_ERROR << "coro_rpc err: \n" << res_.error().msg; continue; } - ++qps; + qps.fetch_add(1, std::memory_order_relaxed); auto old_tp = tp; tp = std::chrono::steady_clock::now(); result.push_back( @@ -113,7 +113,7 @@ int main() { syncAwait(clients.back()->connect("localhost:8801")); } for (int i = 0, lim = thread_cnt; i < lim; ++i) { - send(i, 20000).via(&clients[i]->get_executor()).start([](auto&& res) { + send(i, 100000).via(&clients[i]->get_executor()).start([](auto&& res) { finish_executor->schedule([res = std::move(res.value())] { result.insert(result.end(), res.begin(), res.end()); }); From 62f8d7a28991780cc1f7cf242ea2fb7e05ecbd9c Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Thu, 23 Jan 2025 15:22:57 +0800 Subject: [PATCH 02/13] fix --- include/ylt/coro_io/coro_io.hpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index aabbd3c614..334ad2d816 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -279,9 +279,9 @@ inline async_simple::coro::Lazy async_accept( template inline async_simple::coro::Lazy> -async_read_some(Socket &socket, AsioBuffer &&buffer) noexcept { +async_read_some(Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&](auto &&cb) { + [&,buffer](auto &&cb) { socket.async_read_some(buffer, std::move(cb)); }, socket); @@ -289,9 +289,9 @@ async_read_some(Socket &socket, AsioBuffer &&buffer) noexcept { template inline async_simple::coro::Lazy> -async_read_at(uint64_t offset, Socket &socket, AsioBuffer &&buffer) noexcept { +async_read_at(uint64_t offset, Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&](auto &&cb) { + [&socket,buffer,offset](auto &&cb) { asio::async_read_at(socket, offset, buffer, std::move(cb)); }, socket); @@ -299,9 +299,9 @@ async_read_at(uint64_t offset, Socket &socket, AsioBuffer &&buffer) noexcept { template inline async_simple::coro::Lazy> async_read( - Socket &socket, AsioBuffer &&buffer) noexcept { + Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&](auto &&cb) { + [&socket,buffer](auto &&cb) { asio::async_read(socket, buffer, std::move(cb)); }, socket); @@ -311,7 +311,7 @@ template inline async_simple::coro::Lazy> async_read( Socket &socket, AsioBuffer &buffer, size_t size_to_read) noexcept { return async_io>( - [&](auto &&cb) { + [&,size_to_read](auto &&cb) { asio::async_read(socket, buffer, asio::transfer_exactly(size_to_read), std::move(cb)); }, @@ -331,9 +331,9 @@ async_read_until(Socket &socket, AsioBuffer &buffer, template inline async_simple::coro::Lazy> async_write( - Socket &socket, AsioBuffer &&buffer) noexcept { + Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&](auto &&cb) { + [&,buffer](auto &&cb) { asio::async_write(socket, buffer, std::move(cb)); }, socket); @@ -341,9 +341,9 @@ inline async_simple::coro::Lazy> async_write( template inline async_simple::coro::Lazy> -async_write_some(Socket &socket, AsioBuffer &&buffer) noexcept { +async_write_some(Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&](auto &&cb) { + [&,buffer](auto &&cb) { socket.async_write_some(buffer, std::move(cb)); }, socket); @@ -351,9 +351,9 @@ async_write_some(Socket &socket, AsioBuffer &&buffer) noexcept { template inline async_simple::coro::Lazy> -async_write_at(uint64_t offset, Socket &socket, AsioBuffer &&buffer) noexcept { +async_write_at(uint64_t offset, Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&, offset](auto &&cb) { + [&, offset,buffer](auto &&cb) { asio::async_write_at(socket, offset, buffer, std::move(cb)); }, socket); From 4b84619fb57480264ebc69aad8c68d15f934f1df Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Thu, 23 Jan 2025 15:50:24 +0800 Subject: [PATCH 03/13] fix ssl compile error --- include/ylt/coro_io/coro_io.hpp | 34 ++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 334ad2d816..ace7446982 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -195,6 +195,19 @@ post(Func func, return post(std::move(func), e->get_asio_executor()); } +namespace detail { + +template +void cancel(T &io_object) { + if constexpr (requires { io_object.cancel(); }) { + io_object.cancel(); + } + else { + io_object.lowest_layer().cancel(); + } +} +} // namespace detail + template inline async_simple::coro::Lazy async_io(IO_func io_func, io_object &obj) noexcept { @@ -226,7 +239,7 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, bool expected = false; if (!ptr->compare_exchange_strong( expected, true, std::memory_order_release)) { - obj.cancel(); + detail::cancel(obj); } } }); @@ -247,7 +260,7 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, bool expected = false; if (!lock->compare_exchange_strong(expected, true, std::memory_order_release)) { - obj.cancel(); + detail::cancel(obj); } lock = nullptr; } @@ -257,7 +270,6 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, lock = nullptr; // wait cancel finish to make sure io object's life-time for (; weak_lock.lock();) { - std::cout << "SHIT" << std::endl; co_await coro_io::post( []() { }, @@ -281,7 +293,7 @@ template inline async_simple::coro::Lazy> async_read_some(Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&,buffer](auto &&cb) { + [&, buffer](auto &&cb) { socket.async_read_some(buffer, std::move(cb)); }, socket); @@ -291,7 +303,7 @@ template inline async_simple::coro::Lazy> async_read_at(uint64_t offset, Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&socket,buffer,offset](auto &&cb) { + [&socket, buffer, offset](auto &&cb) { asio::async_read_at(socket, offset, buffer, std::move(cb)); }, socket); @@ -301,7 +313,7 @@ template inline async_simple::coro::Lazy> async_read( Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&socket,buffer](auto &&cb) { + [&socket, buffer](auto &&cb) { asio::async_read(socket, buffer, std::move(cb)); }, socket); @@ -311,7 +323,7 @@ template inline async_simple::coro::Lazy> async_read( Socket &socket, AsioBuffer &buffer, size_t size_to_read) noexcept { return async_io>( - [&,size_to_read](auto &&cb) { + [&, size_to_read](auto &&cb) { asio::async_read(socket, buffer, asio::transfer_exactly(size_to_read), std::move(cb)); }, @@ -333,7 +345,7 @@ template inline async_simple::coro::Lazy> async_write( Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&,buffer](auto &&cb) { + [&, buffer](auto &&cb) { asio::async_write(socket, buffer, std::move(cb)); }, socket); @@ -343,7 +355,7 @@ template inline async_simple::coro::Lazy> async_write_some(Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&,buffer](auto &&cb) { + [&, buffer](auto &&cb) { socket.async_write_some(buffer, std::move(cb)); }, socket); @@ -353,7 +365,7 @@ template inline async_simple::coro::Lazy> async_write_at(uint64_t offset, Socket &socket, AsioBuffer buffer) noexcept { return async_io>( - [&, offset,buffer](auto &&cb) { + [&, offset, buffer](auto &&cb) { asio::async_write_at(socket, offset, buffer, std::move(cb)); }, socket); @@ -404,7 +416,7 @@ inline async_simple::coro::Lazy async_handshake( [&, type](auto &&cb) { ssl_stream->async_handshake(type, std::move(cb)); }, - ssl_stream); + *ssl_stream); } #endif class period_timer : public asio::steady_timer { From 4b5efd602171454d1e4f432448c5a225db398ebf Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Thu, 23 Jan 2025 16:43:05 +0800 Subject: [PATCH 04/13] remove useless dispatch --- include/ylt/coro_io/coro_io.hpp | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index ace7446982..7c9f9896d8 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -230,19 +230,16 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, auto lock = lock_ref; hasCanceled = !slot->emplace( async_simple::SignalType::Terminate, - [&obj, weak_lock = std::weak_ptr{lock}, executor = executor]( + [&obj, weak_lock = std::weak_ptr{lock}]( async_simple::SignalType signalType, async_simple::Signal *signal) { - asio::dispatch( - executor, [&obj, weak_lock = std::move(weak_lock)]() { - if (auto ptr = weak_lock.lock(); ptr) { - bool expected = false; - if (!ptr->compare_exchange_strong( - expected, true, std::memory_order_release)) { - detail::cancel(obj); - } - } - }); + if (auto ptr = weak_lock.lock(); ptr) { + bool expected = false; + if (!ptr->compare_exchange_strong( + expected, true, std::memory_order_release)) { + detail::cancel(obj); + } + } }); if (hasCanceled) { asio::dispatch(executor, [handler]() { From e96d870aa59b16d067eb1a175ab538292282887c Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Thu, 23 Jan 2025 18:30:16 +0800 Subject: [PATCH 05/13] fix timer user-after-free --- include/ylt/coro_io/coro_io.hpp | 15 +++++++--- include/ylt/coro_io/io_context_pool.hpp | 40 ++++++++++++++++++------- src/coro_io/tests/test_client_pool.cpp | 8 ++--- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 7c9f9896d8..0e3f313a0d 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -111,12 +111,19 @@ class callback_awaitor_base { resume(); } template - void set_value(Args &&...args) const { - if constexpr (!std::is_void_v) { - obj->arg_ = {std::forward(args)...}; + void set_value(std::error_code ec, Args &&...args) const { + if constexpr (!std::is_same_v) { + obj->arg_ = {std::move(ec), std::forward(args)...}; } + else { + obj->arg_ = std::move(ec); + } + } + template + void set_value(Args &&args) const { + obj->arg_ = std::move(args); } - void set_value(std::error_code &&ec) const { + void set_value(std::error_code ec) const { if constexpr (std::is_same_v) { obj->arg_ = std::move(ec); } diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 80862a9050..a7e376f70a 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -105,17 +105,35 @@ class ExecutorWrapper : public async_simple::Executor { } void schedule(Func func, Duration dur, uint64_t hint, async_simple::Slot *slot = nullptr) override { - auto timer = std::make_shared(executor_, dur); - timer->async_wait([fn = std::move(func), timer](const auto &ec) { - fn(); - }); - if (!async_simple::signalHelper{async_simple::SignalType::Terminate} - .tryEmplace(slot, [timer](auto signalType, auto *signal) mutable { - asio::dispatch(timer->get_executor(), [timer]() { - timer->cancel(); - }); - })) { - timer->cancel(); + auto timer = + std::make_shared>>( + asio::steady_timer{executor_, dur}, false); + if (!slot) { + timer->first.async_wait([fn = std::move(func), timer](const auto &ec) { + fn(); + }); + } + else { + if (!async_simple::signalHelper{async_simple::SignalType::Terminate} + .tryEmplace( + slot, [timer](auto signalType, auto *signal) mutable { + if (bool expected = false; + !timer->second.compare_exchange_strong( + expected, true, std::memory_order_release)) { + timer->first.cancel(); + } + })) { + asio::dispatch(timer->first.get_executor(), func); + } + else { + timer->first.async_wait([fn = std::move(func), timer](const auto &ec) { + fn(); + }); + if (bool expected = false; !timer->second.compare_exchange_strong( + expected, true, std::memory_order_release)) { + timer->first.cancel(); + } + } } } }; diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp index 71050b0232..cbc40afb8d 100644 --- a/src/coro_io/tests/test_client_pool.cpp +++ b/src/coro_io/tests/test_client_pool.cpp @@ -91,8 +91,8 @@ TEST_CASE("test client pool") { REQUIRE(is_started.hasResult() == false); auto pool = coro_io::client_pool::create( "127.0.0.1:8801", {.max_connection = 100, - .idle_timeout = 300ms, - .short_connect_idle_timeout = 50ms}); + .idle_timeout = 700ms, + .short_connect_idle_timeout = 200ms}); SpinLock lock; ConditionVariable cv; auto res = co_await event(20, *pool, cv, lock); @@ -102,10 +102,10 @@ TEST_CASE("test client pool") { CHECK(res); auto sz = pool->free_client_count(); CHECK(sz == 200); - co_await coro_io::sleep_for(150ms); + co_await coro_io::sleep_for(500ms); sz = pool->free_client_count(); CHECK((sz >= 100 && sz <= 105)); - co_await coro_io::sleep_for(550ms); + co_await coro_io::sleep_for(1000ms); CHECK(pool->free_client_count() == 0); server.stop(); }()); From 9c103480d66f2178cffcbeb761d9609a7c2d250e Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Thu, 23 Jan 2025 18:39:27 +0800 Subject: [PATCH 06/13] f --- include/ylt/coro_io/coro_io.hpp | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 0e3f313a0d..413c86e622 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -110,10 +110,11 @@ class callback_awaitor_base { set_value(std::forward(args)...); resume(); } - template - void set_value(std::error_code ec, Args &&...args) const { + template + void set_value(std::error_code ec, Args &&arg) const { if constexpr (!std::is_same_v) { - obj->arg_ = {std::move(ec), std::forward(args)...}; + std::get<0>(obj->arg_) = std::move(ec); + std::get<1>(obj->arg_) = std::move(arg); } else { obj->arg_ = std::move(ec); @@ -240,13 +241,13 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, [&obj, weak_lock = std::weak_ptr{lock}]( async_simple::SignalType signalType, async_simple::Signal *signal) { - if (auto ptr = weak_lock.lock(); ptr) { - bool expected = false; - if (!ptr->compare_exchange_strong( - expected, true, std::memory_order_release)) { - detail::cancel(obj); - } + if (auto ptr = weak_lock.lock(); ptr) { + bool expected = false; + if (!ptr->compare_exchange_strong( + expected, true, std::memory_order_release)) { + detail::cancel(obj); } + } }); if (hasCanceled) { asio::dispatch(executor, [handler]() { From a103aee697a1e55731e808fb70aee37f43a1cbc9 Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Thu, 23 Jan 2025 18:50:26 +0800 Subject: [PATCH 07/13] f --- include/ylt/coro_io/coro_io.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 413c86e622..b966491160 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -114,7 +114,9 @@ class callback_awaitor_base { void set_value(std::error_code ec, Args &&arg) const { if constexpr (!std::is_same_v) { std::get<0>(obj->arg_) = std::move(ec); - std::get<1>(obj->arg_) = std::move(arg); + if constexpr (requires { std::get<1>(obj->arg_) = std::move(arg); }) { + std::get<1>(obj->arg_) = std::move(arg); + } } else { obj->arg_ = std::move(ec); From 07b2d2317a033bb9abd27cd2a1baf96872f51bdd Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Fri, 24 Jan 2025 16:40:30 +0800 Subject: [PATCH 08/13] 1 --- src/coro_io/tests/test_cancel.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/coro_io/tests/test_cancel.cpp b/src/coro_io/tests/test_cancel.cpp index f39fd399a0..4adae21a7f 100644 --- a/src/coro_io/tests/test_cancel.cpp +++ b/src/coro_io/tests/test_cancel.cpp @@ -125,33 +125,32 @@ async_simple::coro::Lazy test_cancel2(coro_io::io_context_pool& io_pool, } } TEST_CASE("test multithread cancellation") { - // easylog::logger<>::instance().set_min_severity(easylog::Severity::INFO); coro_io::io_context_pool io_pool(101); - ELOG_TRACE << "test multithread cancellation"; + ELOG_WARN << "test multithread cancellation"; std::thread thd{[&]() { - ELOG_TRACE << "start thread pool"; + ELOG_WARN << "start thread pool"; io_pool.run(); }}; auto test = [&](coro_io::ExecutorWrapper<>* e) { for (int sleep_twice = 0; sleep_twice <= 1; ++sleep_twice) { - ELOG_TRACE << "slow work canceled(reverse) test"; + ELOG_WARN << "slow work canceled(reverse) test"; for (int i = 0; i < 100; ++i) { async_simple::coro::syncAwait( test_cancel(io_pool, 10000s, e, sleep_twice, false, true)); } - ELOG_TRACE << "slow work canceled test"; + ELOG_WARN << "slow work canceled test"; for (int i = 0; i < 100; ++i) { async_simple::coro::syncAwait( test_cancel(io_pool, 10000s, e, sleep_twice)); } - ELOG_TRACE << "work finished in 0s test"; + ELOG_WARN << "work finished in 0s test"; for (int i = 0; i < 100; ++i) async_simple::coro::syncAwait(test_cancel(io_pool, 0s, e, sleep_twice)); - ELOG_TRACE << "work finished in same time test"; + ELOG_WARN << "work finished in same time test"; for (int i = 0; i < 20; ++i) async_simple::coro::syncAwait( test_cancel(io_pool, 100ms, e, sleep_twice, true)); - ELOG_TRACE << "work finished in same short time test"; + ELOG_WARN << "work finished in same short time test"; for (int i = 0; i < 100; ++i) async_simple::coro::syncAwait( test_cancel(io_pool, 1ms, e, sleep_twice, true)); From 358c449c581e920bd15554cf621ebf79f6fbbdf3 Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Fri, 24 Jan 2025 18:12:52 +0800 Subject: [PATCH 09/13] fix mem order --- include/ylt/coro_io/coro_io.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index b966491160..56ca7df70a 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -246,7 +246,7 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, if (auto ptr = weak_lock.lock(); ptr) { bool expected = false; if (!ptr->compare_exchange_strong( - expected, true, std::memory_order_release)) { + expected, true, std::memory_order_acq_rel)) { detail::cancel(obj); } } @@ -266,7 +266,7 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, }); bool expected = false; if (!lock->compare_exchange_strong(expected, true, - std::memory_order_release)) { + std::memory_order_acq_rel)) { detail::cancel(obj); } lock = nullptr; From 0b1fd52f829e414482cb82ddc3b1600561ea00fe Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Sun, 26 Jan 2025 14:56:20 +0800 Subject: [PATCH 10/13] fix --- include/ylt/coro_io/coro_io.hpp | 2 +- include/ylt/coro_io/io_context_pool.hpp | 4 +-- include/ylt/metric/summary_impl.hpp | 2 ++ src/coro_io/tests/test_coro_channel.cpp | 42 ++++++++++++++++--------- 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 56ca7df70a..6b95ba33b1 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -276,7 +276,7 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, auto weak_lock = std::weak_ptr{lock}; lock = nullptr; // wait cancel finish to make sure io object's life-time - for (; weak_lock.lock();) { + for (; !weak_lock.expired();) { co_await coro_io::post( []() { }, diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index a7e376f70a..6a9e474113 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -119,7 +119,7 @@ class ExecutorWrapper : public async_simple::Executor { slot, [timer](auto signalType, auto *signal) mutable { if (bool expected = false; !timer->second.compare_exchange_strong( - expected, true, std::memory_order_release)) { + expected, true, std::memory_order_acq_rel)) { timer->first.cancel(); } })) { @@ -130,7 +130,7 @@ class ExecutorWrapper : public async_simple::Executor { fn(); }); if (bool expected = false; !timer->second.compare_exchange_strong( - expected, true, std::memory_order_release)) { + expected, true, std::memory_order_acq_rel)) { timer->first.cancel(); } } diff --git a/include/ylt/metric/summary_impl.hpp b/include/ylt/metric/summary_impl.hpp index 081927a4d1..54292368fe 100644 --- a/include/ylt/metric/summary_impl.hpp +++ b/include/ylt/metric/summary_impl.hpp @@ -135,6 +135,8 @@ class summary_impl { if (piece) { if constexpr (inc_order) { for (int j = 0; j < piece->size(); ++j) { + // tsan check data race here is expected. stat dont need to be very + // strict. we allow old value. auto value = (*piece)[j].load(std::memory_order_relaxed); if (value) { result.emplace_back(get_ordered_index(i * piece_size + j), value); diff --git a/src/coro_io/tests/test_coro_channel.cpp b/src/coro_io/tests/test_coro_channel.cpp index ba415dfd69..2d63a3183d 100644 --- a/src/coro_io/tests/test_coro_channel.cpp +++ b/src/coro_io/tests/test_coro_channel.cpp @@ -3,7 +3,11 @@ #include #include #include +#include +#include #include + +#include "async_simple/coro/Lazy.h" using namespace std::chrono_literals; #ifndef __clang__ @@ -34,18 +38,28 @@ async_simple::coro::Lazy test_channel() { CHECK(val == 42); } +async_simple::coro::Lazy> async_receive_wrapper( + std::shared_ptr> ch) { + co_return co_await async_receive(*ch); +} + +async_simple::coro::Lazy wait_wrapper( + std::shared_ptr t) { + co_await t->async_await(); +} async_simple::coro::Lazy test_select_channel() { using namespace coro_io; using namespace async_simple::coro; - auto ch1 = coro_io::create_channel(1000); - auto ch2 = coro_io::create_channel(1000); + auto ch1 = coro_io::create_shared_channel(1000); + auto ch2 = coro_io::create_shared_channel(1000); - co_await async_send(ch1, 41); - co_await async_send(ch2, 42); + co_await async_send(*ch1, 41); + co_await async_send(*ch2, 42); std::array arr{41, 42}; - auto result = co_await collectAny(async_receive(ch1), async_receive(ch2)); + auto result = co_await collectAny(async_receive_wrapper(ch1), + async_receive_wrapper(ch2)); int val = std::visit( [&val](auto& v) { return static_cast(v.value().second); @@ -54,22 +68,22 @@ async_simple::coro::Lazy test_select_channel() { CHECK(val == arr[result.index()]); - co_await async_send(ch1, 41); - co_await async_send(ch2, 42); + co_await async_send(*ch1, 41); + co_await async_send(*ch2, 42); std::vector>> vec; - vec.push_back(async_receive(ch1)); - vec.push_back(async_receive(ch2)); + vec.push_back(async_receive_wrapper(ch1)); + vec.push_back(async_receive_wrapper(ch2)); auto result2 = co_await collectAny(std::move(vec)); val = result2.value().second; CHECK(val == arr[result2.index()]); - period_timer timer1(coro_io::get_global_executor()); - timer1.expires_after(100ms); - period_timer timer2(coro_io::get_global_executor()); - timer2.expires_after(200ms); - auto val1 = co_await collectAny(timer1.async_await(), timer2.async_await()); + auto timer1 = std::make_shared(coro_io::get_global_executor()); + timer1->expires_after(100ms); + auto timer2 = std::make_shared(coro_io::get_global_executor()); + timer2->expires_after(200ms); + auto val1 = co_await collectAny(wait_wrapper(timer1), wait_wrapper(timer2)); CHECK(val1.index() == 0); } From 32b183c2ca2d0ded68e3ecb4dd7315d8b7da4017 Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Sun, 26 Jan 2025 16:11:37 +0800 Subject: [PATCH 11/13] fix mem order --- include/ylt/coro_io/coro_io.hpp | 113 ++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 43 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 6b95ba33b1..4609732aa9 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -221,6 +221,10 @@ void cancel(T &io_object) { template inline async_simple::coro::Lazy async_io(IO_func io_func, io_object &obj) noexcept { + constexpr int no_cancel_flag = 0; + constexpr int could_cancel_flag = 1; + constexpr int start_cancel_flag = 2; + constexpr int finish_cancel_flag = 3; callback_awaitor awaitor; auto slot = co_await async_simple::coro::CurrentSlot{}; if (!slot) { @@ -233,54 +237,77 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, } else { auto executor = obj.get_executor(); - auto lock = std::make_shared>(); + auto lock = std::make_shared>(); bool hasCanceled; - auto result = co_await awaitor.await_resume( - [&, &lock_ref = lock](auto handler) mutable { - auto lock = lock_ref; - hasCanceled = !slot->emplace( - async_simple::SignalType::Terminate, - [&obj, weak_lock = std::weak_ptr{lock}]( - async_simple::SignalType signalType, - async_simple::Signal *signal) { - if (auto ptr = weak_lock.lock(); ptr) { - bool expected = false; - if (!ptr->compare_exchange_strong( - expected, true, std::memory_order_acq_rel)) { - detail::cancel(obj); - } - } - }); - if (hasCanceled) { - asio::dispatch(executor, [handler]() { - handler.set_value( - std::make_error_code(std::errc::operation_canceled)); - handler.resume(); - }); - } - else { - io_func([&, handler](auto &&...args) mutable { - slot->clear(async_simple::Terminate); - handler.set_value(std::forward(args)...); - handler.resume(); - }); - bool expected = false; - if (!lock->compare_exchange_strong(expected, true, + auto result = co_await awaitor.await_resume([&, &lock_ref = lock]( + auto handler) mutable { + auto lock = lock_ref; + hasCanceled = !slot->emplace( + async_simple::SignalType::Terminate, + [&obj, lock](async_simple::SignalType signalType, + async_simple::Signal *signal) { + int expected = no_cancel_flag; + if (!lock->compare_exchange_strong(expected, could_cancel_flag, std::memory_order_acq_rel)) { - detail::cancel(obj); + if (expected == could_cancel_flag) { + if (lock->compare_exchange_strong(expected, start_cancel_flag, + std::memory_order_release)) { + obj.cancel(); + lock->store(finish_cancel_flag, std::memory_order_release); + } + } } - lock = nullptr; - } + }); + if (hasCanceled) { + asio::dispatch(executor, [handler]() { + handler.set_value( + std::make_error_code(std::errc::operation_canceled)); + handler.resume(); + }); + } + else { + io_func([&, handler](auto &&...args) mutable { + slot->clear(async_simple::Terminate); + handler.set_value(std::forward(args)...); + handler.resume(); }); + int expected = no_cancel_flag; + if (!lock->compare_exchange_strong(expected, could_cancel_flag, + std::memory_order_acq_rel)) { + if (expected == could_cancel_flag) { + if (lock->compare_exchange_strong(expected, start_cancel_flag, + std::memory_order_release)) { + obj.cancel(); + lock->store(finish_cancel_flag, std::memory_order_release); + } + } + } + } + }); if (!hasCanceled) { - auto weak_lock = std::weak_ptr{lock}; - lock = nullptr; - // wait cancel finish to make sure io object's life-time - for (; !weak_lock.expired();) { - co_await coro_io::post( - []() { - }, - executor); + int expected = no_cancel_flag; + if (!lock->compare_exchange_strong(expected, finish_cancel_flag, + std::memory_order_acq_rel)) { + if (expected != finish_cancel_flag) { + do { + if (expected == could_cancel_flag) { + if (lock->compare_exchange_strong(expected, finish_cancel_flag, + std::memory_order_acq_rel) || + expected == finish_cancel_flag) { + break; + } + } + // flag is start_cancel_flag now. + // wait cancel finish to make sure io object's life-time + for (; + lock->load(std::memory_order_acquire) == start_cancel_flag;) { + co_await coro_io::post( + []() { + }, + executor); + } + } while (0); + } } } co_return result; From 109ac1260d1636ce0d2c33781841433cd31fddbf Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Sun, 26 Jan 2025 16:19:11 +0800 Subject: [PATCH 12/13] fix --- include/ylt/coro_io/coro_io.hpp | 47 +++++++++++++++++---------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 4609732aa9..a19a33aaff 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -216,15 +216,16 @@ void cancel(T &io_object) { io_object.lowest_layer().cancel(); } } +constexpr int no_cancel_flag = 0; +constexpr int could_cancel_flag = 1; +constexpr int start_cancel_flag = 2; +constexpr int finish_cancel_flag = 3; } // namespace detail template inline async_simple::coro::Lazy async_io(IO_func io_func, io_object &obj) noexcept { - constexpr int no_cancel_flag = 0; - constexpr int could_cancel_flag = 1; - constexpr int start_cancel_flag = 2; - constexpr int finish_cancel_flag = 3; + callback_awaitor awaitor; auto slot = co_await async_simple::coro::CurrentSlot{}; if (!slot) { @@ -246,14 +247,14 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, async_simple::SignalType::Terminate, [&obj, lock](async_simple::SignalType signalType, async_simple::Signal *signal) { - int expected = no_cancel_flag; - if (!lock->compare_exchange_strong(expected, could_cancel_flag, + int expected = detail::no_cancel_flag; + if (!lock->compare_exchange_strong(expected, detail::could_cancel_flag, std::memory_order_acq_rel)) { - if (expected == could_cancel_flag) { - if (lock->compare_exchange_strong(expected, start_cancel_flag, + if (expected == detail::could_cancel_flag) { + if (lock->compare_exchange_strong(expected, detail::start_cancel_flag, std::memory_order_release)) { - obj.cancel(); - lock->store(finish_cancel_flag, std::memory_order_release); + detail::cancel(obj); + lock->store(detail::finish_cancel_flag, std::memory_order_release); } } } @@ -271,36 +272,36 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, handler.set_value(std::forward(args)...); handler.resume(); }); - int expected = no_cancel_flag; - if (!lock->compare_exchange_strong(expected, could_cancel_flag, + int expected = detail::no_cancel_flag; + if (!lock->compare_exchange_strong(expected, detail::could_cancel_flag, std::memory_order_acq_rel)) { - if (expected == could_cancel_flag) { - if (lock->compare_exchange_strong(expected, start_cancel_flag, + if (expected == detail::could_cancel_flag) { + if (lock->compare_exchange_strong(expected, detail::start_cancel_flag, std::memory_order_release)) { - obj.cancel(); - lock->store(finish_cancel_flag, std::memory_order_release); + detail::cancel(obj); + lock->store(detail::finish_cancel_flag, std::memory_order_release); } } } } }); if (!hasCanceled) { - int expected = no_cancel_flag; - if (!lock->compare_exchange_strong(expected, finish_cancel_flag, + int expected = detail::no_cancel_flag; + if (!lock->compare_exchange_strong(expected, detail::finish_cancel_flag, std::memory_order_acq_rel)) { - if (expected != finish_cancel_flag) { + if (expected != detail::finish_cancel_flag) { do { - if (expected == could_cancel_flag) { - if (lock->compare_exchange_strong(expected, finish_cancel_flag, + if (expected == detail::could_cancel_flag) { + if (lock->compare_exchange_strong(expected, detail::finish_cancel_flag, std::memory_order_acq_rel) || - expected == finish_cancel_flag) { + expected == detail::finish_cancel_flag) { break; } } // flag is start_cancel_flag now. // wait cancel finish to make sure io object's life-time for (; - lock->load(std::memory_order_acquire) == start_cancel_flag;) { + lock->load(std::memory_order_acquire) == detail::start_cancel_flag;) { co_await coro_io::post( []() { }, From deb556691c4a054bd33fb719348ea70d05c3a95d Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Sun, 26 Jan 2025 16:38:49 +0800 Subject: [PATCH 13/13] fix format --- include/ylt/coro_io/coro_io.hpp | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index a19a33aaff..c66e51e71f 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -225,7 +225,6 @@ constexpr int finish_cancel_flag = 3; template inline async_simple::coro::Lazy async_io(IO_func io_func, io_object &obj) noexcept { - callback_awaitor awaitor; auto slot = co_await async_simple::coro::CurrentSlot{}; if (!slot) { @@ -248,13 +247,16 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, [&obj, lock](async_simple::SignalType signalType, async_simple::Signal *signal) { int expected = detail::no_cancel_flag; - if (!lock->compare_exchange_strong(expected, detail::could_cancel_flag, + if (!lock->compare_exchange_strong(expected, + detail::could_cancel_flag, std::memory_order_acq_rel)) { if (expected == detail::could_cancel_flag) { - if (lock->compare_exchange_strong(expected, detail::start_cancel_flag, + if (lock->compare_exchange_strong(expected, + detail::start_cancel_flag, std::memory_order_release)) { detail::cancel(obj); - lock->store(detail::finish_cancel_flag, std::memory_order_release); + lock->store(detail::finish_cancel_flag, + std::memory_order_release); } } } @@ -276,10 +278,12 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, if (!lock->compare_exchange_strong(expected, detail::could_cancel_flag, std::memory_order_acq_rel)) { if (expected == detail::could_cancel_flag) { - if (lock->compare_exchange_strong(expected, detail::start_cancel_flag, + if (lock->compare_exchange_strong(expected, + detail::start_cancel_flag, std::memory_order_release)) { detail::cancel(obj); - lock->store(detail::finish_cancel_flag, std::memory_order_release); + lock->store(detail::finish_cancel_flag, + std::memory_order_release); } } } @@ -292,7 +296,8 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, if (expected != detail::finish_cancel_flag) { do { if (expected == detail::could_cancel_flag) { - if (lock->compare_exchange_strong(expected, detail::finish_cancel_flag, + if (lock->compare_exchange_strong(expected, + detail::finish_cancel_flag, std::memory_order_acq_rel) || expected == detail::finish_cancel_flag) { break; @@ -300,8 +305,8 @@ inline async_simple::coro::Lazy async_io(IO_func io_func, } // flag is start_cancel_flag now. // wait cancel finish to make sure io object's life-time - for (; - lock->load(std::memory_order_acquire) == detail::start_cancel_flag;) { + for (; lock->load(std::memory_order_acquire) == + detail::start_cancel_flag;) { co_await coro_io::post( []() { },