Skip to content

Commit

Permalink
Merge pull request #36 from qchateau/bugfix
Browse files Browse the repository at this point in the history
Bugfix
  • Loading branch information
qchateau authored Jun 25, 2020
2 parents 9e49742 + 9f1bc2a commit 66e336c
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions include/packio/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class client : public std::enable_shared_from_this<client<Socket, Map>> {
auto ec = make_error_code(error::cancelled);
self->async_call_handler(
id, internal::make_msgpack_object(ec.message()), ec);
self->maybe_stop_reading();
});
}

Expand All @@ -100,7 +99,6 @@ class client : public std::enable_shared_from_this<client<Socket, Map>> {
internal::make_msgpack_object(ec.message()),
ec);
}
self->maybe_stop_reading();
});
}

Expand Down Expand Up @@ -205,19 +203,18 @@ class client : public std::enable_shared_from_this<client<Socket, Map>> {
template <typename Buffer, typename WriteHandler>
void async_send(std::unique_ptr<Buffer> buffer_ptr, WriteHandler&& handler)
{
wstrand_.push([this,
self = shared_from_this(),
wstrand_.push([self = shared_from_this(),
buffer_ptr = std::move(buffer_ptr),
handler = std::forward<WriteHandler>(handler)]() mutable {
using internal::buffer;

internal::set_no_delay(socket_);
internal::set_no_delay(self->socket_);

auto buf = buffer(*buffer_ptr);
net::async_write(
socket_,
self->socket_,
buf,
[self = std::move(self),
[self,
buffer_ptr = std::move(buffer_ptr),
handler = std::forward<WriteHandler>(handler)](
error_code ec, size_t length) mutable {
Expand Down Expand Up @@ -302,25 +299,27 @@ class client : public std::enable_shared_from_this<client<Socket, Map>> {
void async_call_handler(id_type id, msgpack::object_handle result, error_code ec)
{
net::dispatch(
call_strand_, [this, ec, id, result = std::move(result)]() mutable {
call_strand_,
[ec, id, self = shared_from_this(), result = std::move(result)]() mutable {
PACKIO_DEBUG("calling handler for id: {}", id);

assert(call_strand_.running_in_this_thread());
auto it = pending_.find(id);
if (it == pending_.end()) {
auto it = self->pending_.find(id);
if (it == self->pending_.end()) {
PACKIO_WARN("unexisting id");
return;
}

auto handler = std::move(it->second);
pending_.erase(it);
self->pending_.erase(it);
self->maybe_stop_reading();

// handle the response asynchronously (post)
// to schedule the next read immediately
// this will allow parallel response handling
// in multi-threaded environments
net::post(
socket_.get_executor(),
self->socket_.get_executor(),
[ec,
handler = std::move(handler),
result = std::move(result)]() mutable {
Expand Down

0 comments on commit 66e336c

Please sign in to comment.