Skip to content

Commit

Permalink
windows ipc: significant refactor
Browse files Browse the repository at this point in the history
This refactors a lot of the IPC code to hopefully address various
hangs on shutdown, etc.  The problem is that named pipes are not
terrifically reliable when it comes to aborting ConnectNamedPipe.
Additionally there were some logic errors in some of our code that
left things rather brittle.

Ultimately this all needs to be replaced with UNIX domain sockets
which are superior in many ways.
  • Loading branch information
gdamore committed Dec 10, 2024
1 parent 2e72f32 commit 2ade67c
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 144 deletions.
13 changes: 8 additions & 5 deletions src/platform/windows/win_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ win_io_handler(void *arg)
int rv;

ok = GetQueuedCompletionStatus(
win_io_h, &cnt, &key, &olpd, INFINITE);
win_io_h, &cnt, &key, &olpd, 5000);

if (olpd == NULL) {
// Completion port closed...
NNI_ASSERT(ok == FALSE);
break;
}

Expand Down Expand Up @@ -124,12 +123,16 @@ nni_win_io_sysfini(void)
HANDLE h;

if ((h = win_io_h) != NULL) {
// send wakeups in case closing the handle doesn't work
for (i = 0; i < win_io_nthr; i++) {
PostQueuedCompletionStatus(h, 0, 0, NULL);
}
CloseHandle(h);
for (i = 0; i < win_io_nthr; i++) {
nni_thr_fini(&win_io_thrs[i]);
}
win_io_h = NULL;
}
for (i = 0; i < win_io_nthr; i++) {
nni_thr_fini(&win_io_thrs[i]);
}

NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr);
}
Expand Down
249 changes: 145 additions & 104 deletions src/platform/windows/win_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "core/aio.h"
#include "core/nng_impl.h"

#include "win_ipc.h"
Expand All @@ -32,11 +33,27 @@ typedef struct ipc_conn {
bool closed;
bool sending;
bool recving;
bool recv_fail;
bool send_fail;
nni_mtx mtx;
nni_cv cv;
nni_reap_node reap;
} ipc_conn;

static void
ipc_recv_fail(ipc_conn *c, int rv)
{
nni_aio *aio;
c->recving = false;
c->recv_fail = true;
c->recv_rv = rv;
while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
nni_cv_wake(&c->cv);
}

static void
ipc_recv_start(ipc_conn *c)
{
Expand All @@ -48,55 +65,53 @@ ipc_recv_start(ipc_conn *c)
DWORD len;
int rv;

while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
if (c->closed) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
continue;
}
if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
nni_cv_wake(&c->cv);
return;
}

nni_aio_get_iov(aio, &naiov, &aiov);
if (c->closed) {
ipc_recv_fail(c, NNG_ECLOSED);
return;
}

idx = 0;
while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
idx++;
}
NNI_ASSERT(idx < naiov);
// Now start a transfer. We assume that only one send can be
// outstanding on a pipe at a time. This is important to avoid
// scrambling the data anyway. Note that Windows named pipes
// do not appear to support scatter/gather, so we have to
// process each element in turn.
buf = aiov[idx].iov_buf;
len = (DWORD) aiov[idx].iov_len;
NNI_ASSERT(buf != NULL);
NNI_ASSERT(len != 0);

// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
if (len > 0x1000000) {
len = 0x1000000;
}
nni_aio_get_iov(aio, &naiov, &aiov);

c->recving = true;
if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
c->recving = false;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, nni_win_error(rv));
} else {
return;
}
idx = 0;
while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
idx++;
}
NNI_ASSERT(idx < naiov);
// Now start a transfer. We assume that only one send can be
// outstanding on a pipe at a time. This is important to avoid
// scrambling the data anyway. Note that Windows named pipes
// do not appear to support scatter/gather, so we have to
// process each element in turn.
buf = aiov[idx].iov_buf;
len = (DWORD) aiov[idx].iov_len;
NNI_ASSERT(buf != NULL);
NNI_ASSERT(len != 0);

// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
if (len > 0x1000000) {
len = 0x1000000;
}

c->recving = true;
if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
ipc_recv_fail(c, nni_win_error(rv));
}
nni_cv_wake(&c->cv);
}

static void
ipc_recv_cb(nni_win_io *io, int rv, size_t num)
{
nni_aio *aio;
ipc_conn *c = io->ptr;

nni_mtx_lock(&c->mtx);
aio = nni_list_first(&c->recv_aios);
NNI_ASSERT(aio != NULL);
Expand All @@ -109,11 +124,17 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
rv = NNG_ECONNSHUT;
}
c->recving = false;
if (rv != 0) {
ipc_recv_fail(c, nni_win_error(rv));
nni_mtx_unlock(&c->mtx);
return;
}
nni_aio_list_remove(aio);
ipc_recv_start(c);
nni_mtx_unlock(&c->mtx);

nni_aio_finish_sync(aio, rv, num);
// nni_aio_finish_sync(aio, rv, num);
nni_aio_finish(aio, rv, num);
}

static void
Expand Down Expand Up @@ -153,6 +174,12 @@ ipc_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
if (c->recv_fail) {
rv = c->recv_rv;
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
}
if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
Expand All @@ -165,6 +192,21 @@ ipc_recv(void *arg, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}

static void
ipc_send_fail(ipc_conn *c, int rv)
{
nni_aio *aio;

c->sending = false;
c->send_fail = true;
c->send_rv = rv;
while ((aio = nni_list_first(&c->send_aios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
nni_cv_wake(&c->cv);
}

static void
ipc_send_start(ipc_conn *c)
{
Expand All @@ -176,43 +218,45 @@ ipc_send_start(ipc_conn *c)
DWORD len;
int rv;

while ((aio = nni_list_first(&c->send_aios)) != NULL) {
if ((aio = nni_list_first(&c->send_aios)) == NULL) {
nni_cv_wake(&c->cv);
return;
}

nni_aio_get_iov(aio, &naiov, &aiov);
if (c->closed) {
ipc_send_fail(c, NNG_ECLOSED);
return;
}

idx = 0;
while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
idx++;
}
NNI_ASSERT(idx < naiov);
// Now start a transfer. We assume that only one send can be
// outstanding on a pipe at a time. This is important to avoid
// scrambling the data anyway. Note that Windows named pipes
// do not appear to support scatter/gather, so we have to
// process each element in turn.
buf = aiov[idx].iov_buf;
len = (DWORD) aiov[idx].iov_len;
NNI_ASSERT(buf != NULL);
NNI_ASSERT(len != 0);

// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
if (len > 0x1000000) {
len = 0x1000000;
}
nni_aio_get_iov(aio, &naiov, &aiov);

c->sending = true;
if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
c->sending = false;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, nni_win_error(rv));
} else {
return;
}
idx = 0;
while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
idx++;
}
NNI_ASSERT(idx < naiov);
// Now start a transfer. We assume that only one send can be
// outstanding on a pipe at a time. This is important to avoid
// scrambling the data anyway. Note that Windows named pipes
// do not appear to support scatter/gather, so we have to
// process each element in turn.
buf = aiov[idx].iov_buf;
len = (DWORD) aiov[idx].iov_len;
NNI_ASSERT(buf != NULL);
NNI_ASSERT(len != 0);

// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
if (len > 0x1000000) {
len = 0x1000000;
}

c->sending = true;
if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
ipc_send_fail(c, nni_win_error(rv));
}
nni_cv_wake(&c->cv);
}

static void
Expand Down Expand Up @@ -284,6 +328,8 @@ ipc_close(void *arg)
{
ipc_conn *c = arg;
nni_time now;
nni_aio *aio;

nni_mtx_lock(&c->mtx);
if (!c->closed) {
HANDLE f = c->f;
Expand All @@ -294,58 +340,53 @@ ipc_close(void *arg)
if (f != INVALID_HANDLE_VALUE) {
CancelIoEx(f, &c->send_io.olpd);
CancelIoEx(f, &c->recv_io.olpd);
DisconnectNamedPipe(f);
CloseHandle(f);
}
}
now = nni_clock();
// wait up to a maximum of 10 seconds before assuming something is
// badly amiss. from what we can tell, this doesn't happen, and we do
// see the timer expire properly, but this safeguard can prevent a
// hang.
while ((c->recving || c->sending) &&
((nni_clock() - now) < (NNI_SECOND * 10))) {
nni_mtx_unlock(&c->mtx);
nni_msleep(1);
nni_mtx_lock(&c->mtx);
if ((aio = nni_list_first(&c->send_aios)) != NULL) {
nni_aio_abort(aio, NNG_ECLOSED);
}
if ((aio = nni_list_first(&c->recv_aios)) != NULL) {
nni_aio_abort(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&c->mtx);
}

static void
ipc_conn_reap(void *arg)
ipc_free(void *arg)
{
ipc_conn *c = arg;
nni_aio *aio;
HANDLE f = c->f;
int loop = 0;

nni_mtx_lock(&c->mtx);
while ((!nni_list_empty(&c->recv_aios)) ||
(!nni_list_empty(&c->send_aios))) {
nni_cv_wait(&c->cv);
// time for callbacks to fire/drain.
nni_time when = nng_clock() + 5000;
while (c->sending || c->recving) {
if (nni_cv_until(&c->cv, when) == NNG_ETIMEDOUT) {
nng_log_err("NNG-WIN-IPC",
"Timeout waiting for operations to cancel");
break;
}
}
// These asserts are for debug, we should never see it.
// If we do then something bad happened.
NNI_ASSERT(!c->sending);
NNI_ASSERT(!c->recving);
NNI_ASSERT(nni_list_empty(&c->recv_aios));
NNI_ASSERT(nni_list_empty(&c->send_aios));
nni_mtx_unlock(&c->mtx);

if (c->f != INVALID_HANDLE_VALUE) {
CloseHandle(c->f);
if (f != INVALID_HANDLE_VALUE) {
DisconnectNamedPipe(f);
CloseHandle(f);
}

nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}

static nni_reap_list ipc_reap_list = {
.rl_offset = offsetof(ipc_conn, reap),
.rl_func = ipc_conn_reap,
};

static void
ipc_free(void *arg)
{
ipc_conn *c = arg;
ipc_close(c);

nni_reap(&ipc_reap_list, c);
}

static int
ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t)
{
Expand Down
Loading

0 comments on commit 2ade67c

Please sign in to comment.