Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: epoll socket async chaining and refactor #374

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 48 additions & 55 deletions util/fibers/epoll_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void EpollSocket::PendingReq::Activate(error_code ec) {
ActivateSameThread(detail::FiberActive(), context_);
}

bool EpollSocket::AsyncReq::Run(int fd, bool is_send) {
std::pair<bool, EpollSocket::Result<size_t>> EpollSocket::AsyncReq::Run(int fd, bool is_send) {
msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = vec;
Expand All @@ -166,22 +166,18 @@ bool EpollSocket::AsyncReq::Run(int fd, bool is_send) {
res = is_send ? sendmsg(fd, &msg, MSG_NOSIGNAL) : recvmsg(fd, &msg, 0);

if (res > 0) {
cb(res);
return true;
return {true, res};
romange marked this conversation as resolved.
Show resolved Hide resolved
}

if (res == 0) {
CHECK(!is_send); // can only happen with recvmsg
cb(MakeUnexpected(errc::connection_aborted));
return true;
return {true, MakeUnexpected(errc::connection_aborted)};
}

if (errno == EAGAIN)
return false;
return {false, 0};

error_code ec = from_errno();
cb(make_unexpected(ec));
return true;
return {true, make_unexpected(from_errno())};
}

EpollSocket::EpollSocket(int fd)
Expand Down Expand Up @@ -381,8 +377,11 @@ void EpollSocket::AsyncWriteSome(const iovec* v, uint32_t len, io::AsyncProgress
CHECK(async_write_req_ == nullptr); // we do not allow queuing multiple async requests.

AsyncReq req{const_cast<iovec*>(v), len, std::move(cb)};
if (req.Run(native_handle(), true))
auto [completed, result] = req.Run(native_handle(), true);
if (completed) {
req.cb(result);
return;
}

async_write_req_ = new AsyncReq(std::move(req));
async_write_pending_ = 1;
Expand Down Expand Up @@ -568,6 +567,41 @@ void EpollSocket::CancelOnErrorCb() {
error_cb_ = {};
}

void EpollSocket::HandleAsyncRequest(error_code ec, bool is_send) {
auto async_pending = is_send ? async_write_pending_ : async_read_pending_;
if (async_pending) {
auto& async_request = is_send ? async_write_req_ : async_read_req_;
DCHECK(async_request);

auto finalize_and_fetch_cb = [this, &async_request, is_send]() {
auto cb = std::move(async_request->cb);
delete async_request;
async_request = nullptr;
if (is_send)
async_write_pending_ = 0;
else
async_read_pending_ = 0;
return cb;
};

if (ec) {
auto cb = finalize_and_fetch_cb();
cb(make_unexpected(ec));
} else if (auto res = async_request->Run(native_handle(), is_send); res.first) {
auto cb = finalize_and_fetch_cb();
cb(res.second);
}
} else {
auto& sync_request = is_send ? write_req_ : read_req_;
// It could be that we activated context already, but has not switched to it yet.
// Meanwhile a new event has arrived that triggered this callback again.
if (sync_request && sync_request->IsSuspended()) {
DVSOCK(2) << "Wakey: Schedule read in " << sync_request->name();
sync_request->Activate(ec);
}
}
}

void EpollSocket::Wakey(uint32_t ev_mask, int error, EpollProactor* cntr) {
DVSOCK(2) << "Wakey " << ev_mask;
#ifdef __linux__
Expand All @@ -584,54 +618,13 @@ void EpollSocket::Wakey(uint32_t ev_mask, int error, EpollProactor* cntr) {
}

if (ev_mask & (EpollProactor::EPOLL_IN | kErrMask)) {
if (async_read_pending_) {
DCHECK(async_read_req_);

auto finalize = [this] {
delete async_read_req_;
async_read_req_ = nullptr;
async_read_pending_ = 0;
};
if (ec) {
async_read_req_->cb(make_unexpected(ec));
finalize();
} else if (async_read_req_->Run(native_handle(), false)) {
finalize();
}
} else {
// It could be that we activated context already, but has not switched to it yet.
// Meanwhile a new event has arrived that triggered this callback again.
if (read_req_ && read_req_->IsSuspended()) {
DVSOCK(2) << "Wakey: Schedule read in " << read_req_->name();
read_req_->Activate(ec);
}
}
bool is_send = false;
HandleAsyncRequest(ec, is_send);
}

if (ev_mask & (EpollProactor::EPOLL_OUT | kErrMask)) {
if (async_write_pending_) {
DCHECK(async_write_req_);

auto finalize = [this] {
delete async_write_req_;
async_write_req_ = nullptr;
async_write_pending_ = 0;
};

if (ec) {
async_write_req_->cb(make_unexpected(ec));
finalize();
} else if (async_write_req_->Run(native_handle(), true)) {
finalize();
}
} else {
// It could be that we activated context already but has not switched to it yet.
// Meanwhile a new event has arrived that triggered this callback again.
if (write_req_ && write_req_->IsSuspended()) {
DVSOCK(2) << "Wakey: Schedule write in " << write_req_->name();
write_req_->Activate(ec);
}
}
bool is_send = true;
HandleAsyncRequest(ec, is_send);
}

if (error_cb_ && (ev_mask & kErrMask)) {
Expand Down
8 changes: 6 additions & 2 deletions util/fibers/epoll_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#pragma once

#include <utility>

#include "util/fiber_socket_base.h"
#include "util/fibers/epoll_proactor.h"

Expand Down Expand Up @@ -54,8 +56,8 @@ class EpollSocket : public LinuxSocketBase {
AsyncReq(iovec* v, uint32_t l, io::AsyncProgressCb _cb) : len(l), vec(v), cb(std::move(_cb)) {
}

// Returns true if it has been fullfilled.
bool Run(int fd, bool is_send);
// Caller is responsible for *calling* cb.
std::pair<bool, Result<size_t>> Run(int fd, bool is_send);
};

EpollProactor* GetProactor() {
Expand All @@ -67,6 +69,8 @@ class EpollSocket : public LinuxSocketBase {
// kevent pass error code together with completion event.
void Wakey(uint32_t event_flags, int error, EpollProactor* cntr);

void HandleAsyncRequest(error_code ec, bool is_send);

union {
PendingReq* write_req_;
AsyncReq* async_write_req_;
Expand Down
Loading