Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reference counting changes #1951

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9618e85
aio: make sure aio is initialized before certain operations
gdamore Nov 30, 2024
2791cfd
aio: fix hang on shutdown waiting for aio queue to drain
gdamore Nov 29, 2024
eb513b1
ctx: Simplify handling for closed contexts.
gdamore Nov 28, 2024
f238703
fixes #1408 Reference count as a first class type
gdamore Nov 29, 2024
240340a
Pipe protocol data is never null.
gdamore Nov 29, 2024
804f400
Dialer and listener reference count refactor.
gdamore Nov 29, 2024
71c051c
performance: reference counters can use relaxed order when incrementing
gdamore Nov 29, 2024
d490523
Fix failure reference count leak for dialer/listener
gdamore Nov 29, 2024
5afc866
Eliminate s_cv unused condvar.
gdamore Nov 29, 2024
f24585d
Context reference counts
gdamore Nov 29, 2024
f756ed2
reap: thread exits prematurely after reinitialization
gdamore Nov 29, 2024
ce967a1
socket: convert to using reference counts for shutdown
gdamore Nov 29, 2024
af78e5e
nng_fini: Simpler clean up of sockets, add a test.
gdamore Nov 30, 2024
24c5dff
listener: fix leaking listener after nng_listener_close
gdamore Dec 1, 2024
6f939e2
websocket transport: inline the aios
gdamore Dec 1, 2024
5611ce5
websocket: more aio inlining (generic websocket layer)
gdamore Dec 1, 2024
0c447c9
pipe: allocate and destroy pipe transport data with common pipe data
gdamore Dec 2, 2024
c09c27d
aio: make nni_aio_fini safer by not implicitly stoppping
gdamore Dec 2, 2024
553fbd6
tests: ipc test valgrind fix for uninitialized stack data
gdamore Dec 2, 2024
6bb06d1
socket transport: No need for a cool down for this transport.
gdamore Dec 2, 2024
1d67b79
aio: fix data race in aio
gdamore Dec 2, 2024
ce3e102
url: add missing nni_aio_stop
gdamore Dec 3, 2024
b369ee8
refcnt: initialize the atomic
gdamore Dec 3, 2024
7e41dd4
aio: debugging support for missing stop
gdamore Dec 3, 2024
da922e5
aio: add missing nni_aio_stop calls for public NNG convenience functions
gdamore Dec 3, 2024
b420175
inproc: add pipe stop.
gdamore Dec 3, 2024
daf06b8
sockfd: constify ops vectors
gdamore Dec 3, 2024
80c87e7
pair1: stop the pipe sooner
gdamore Dec 3, 2024
6ac829f
pipe: simplify because we always have tran and proto data
gdamore Dec 3, 2024
afc09b5
dialer/listener/socket: ensure teardown order prevents use-after-free
gdamore Dec 4, 2024
7c7c537
aio: significant rework
gdamore Dec 4, 2024
9b3be5c
pipes: make separate dialer/listener pipe allocators.
gdamore Dec 4, 2024
cec78b1
udp: hang on close after sending too large
gdamore Dec 4, 2024
bab1969
aio: stop has to wait for expirations to finish
gdamore Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ nng_sources(
protocol.h
reap.c
reap.h
refcnt.c
refcnt.h
sockaddr.c
socket.c
socket.h
Expand Down
99 changes: 46 additions & 53 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
//

#include "core/nng_impl.h"
#include "core/platform.h"
#include "nng/nng.h"
#include <string.h>

struct nni_aio_expire_q {
Expand Down Expand Up @@ -87,38 +89,16 @@
aio->a_timeout = NNG_DURATION_INFINITE;
aio->a_expire_q =
nni_aio_expire_q_list[nni_random() % nni_aio_expire_q_cnt];
aio->a_init = true;
}

void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;

// This is like aio_close, but we don't want to dispatch
// the task. And unlike aio_stop, we don't want to wait
// for the task. (Because we implicitly do task_fini.)
// We also wait if the aio is being expired.
nni_mtx_lock(&eq->eq_mtx);
aio->a_stop = true;
while (aio->a_expiring) {
nni_cv_wait(&eq->eq_cv);
}
nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
nni_mtx_unlock(&eq->eq_mtx);

if (fn != NULL) {
fn(aio, arg, NNG_ECLOSED);
} else {
nni_task_abort(&aio->a_task);
if (aio != NULL && aio->a_init) {
NNI_ASSERT(!nni_aio_busy(aio));
nni_task_fini(&aio->a_task);
}

nni_task_fini(&aio->a_task);
}

int
Expand All @@ -138,6 +118,7 @@
nni_aio_free(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_stop(aio);
nni_aio_fini(aio);
NNI_FREE_STRUCT(aio);
}
Expand All @@ -146,7 +127,7 @@
void
nni_aio_reap(nni_aio *aio)
{
if (aio != NULL) {
if (aio != NULL && aio->a_init) {
nni_reap(&aio_reap_list, aio);
}
}
Expand Down Expand Up @@ -179,34 +160,36 @@
void
nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
if (aio != NULL && aio->a_init) {
nni_aio_cancel_fn fn;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;

nni_mtx_lock(&eq->eq_mtx);
aio->a_stop = true;
while (aio->a_expiring) {
nni_cv_wait(&eq->eq_cv);
}
nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
aio->a_stop = true;
nni_mtx_unlock(&eq->eq_mtx);

if (fn != NULL) {
fn(aio, arg, NNG_ECANCELED);
} else {
nni_task_abort(&aio->a_task);
}

nni_aio_wait(aio);
NNI_ASSERT(!nni_aio_busy(aio));
}
}

void
nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
if (aio != NULL && aio->a_init) {
nni_aio_cancel_fn fn;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
Expand All @@ -222,8 +205,6 @@

if (fn != NULL) {
fn(aio, arg, NNG_ECLOSED);
} else {
nni_task_abort(&aio->a_task);
}
}
}
Expand Down Expand Up @@ -309,7 +290,9 @@
void
nni_aio_wait(nni_aio *aio)
{
nni_task_wait(&aio->a_task);
if (aio != NULL && aio->a_init) {
nni_task_wait(&aio->a_task);
}
}

bool
Expand All @@ -321,26 +304,21 @@
int
nni_aio_begin(nni_aio *aio)
{
// If any of these triggers then the caller has a defect because
// it means that the aio is already in use. This is always
// a bug in the caller. These checks are not technically thread
// safe in the event that they are false. Users of race detectors
// checks may wish ignore or suppress these checks.
nni_aio_expire_q *eq = aio->a_expire_q;

nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(!nni_aio_list_active(aio));
NNI_ASSERT(aio->a_cancel_fn == NULL);
NNI_ASSERT(!nni_list_node_active(&aio->a_expire_node));

// Some initialization can be done outside the lock, because
// we must have exclusive access to the aio.
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
aio->a_result = 0;
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_result = 0;
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_abort = false;
aio->a_abort_result = 0;

// We should not reschedule anything at this point.
if (aio->a_stop) {
Expand All @@ -367,7 +345,6 @@
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
nni_task_abort(&aio->a_task);
return (NNG_ETIMEDOUT);
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
Expand All @@ -380,11 +357,18 @@
}

nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(nni_task_busy(&aio->a_task));
if (aio->a_stop) {
nni_task_abort(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECLOSED);
}
if (aio->a_abort) {
int rv = aio->a_abort_result;
aio->a_abort = false;
aio->a_abort_result = 0;
nni_mtx_unlock(&eq->eq_mtx);
return (rv);

Check warning on line 370 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L366-L370

Added lines #L366 - L370 were not covered by tests
}

NNI_ASSERT(aio->a_cancel_fn == NULL);
aio->a_cancel_fn = cancel;
Expand All @@ -408,19 +392,26 @@
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;

NNI_ASSERT(rv > 0);
nni_mtx_lock(&eq->eq_mtx);
nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
if (fn != NULL) {
aio->a_abort = true;
aio->a_abort_result = rv;
}
nni_mtx_unlock(&eq->eq_mtx);

// Stop any I/O at the provider level.
// If this doesn't catch it, it will be reported
// at nni_aio_schedule (if this is to be scheduled),
// or else we have proceeded to far to cancel this operation.
// (In which case it should complete shortly.)
if (fn != NULL) {
fn(aio, arg, rv);
} else {
nni_task_abort(&aio->a_task);
}
}

Expand All @@ -443,9 +434,11 @@
aio->a_msg = msg;
}

aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_use_expire = false;
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_use_expire = false;
aio->a_abort = false;
aio->a_abort_result = 0;
nni_mtx_unlock(&eq->eq_mtx);

if (sync) {
Expand Down Expand Up @@ -601,7 +594,7 @@
nni_cv_until(cv, next);
continue;
}
q->eq_next = NNI_TIME_NEVER;
q->eq_next = q->eq_exit ? nni_clock() : NNI_TIME_NEVER;
exp_idx = 0;
while (aio != NULL) {
if ((aio->a_expire < now) &&
Expand Down
55 changes: 33 additions & 22 deletions src/core/aio.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand All @@ -13,6 +13,7 @@

#include "core/defs.h"
#include "core/list.h"
#include "core/platform.h"
#include "core/reap.h"
#include "core/taskq.h"
#include "core/thread.h"
Expand All @@ -26,11 +27,14 @@ typedef void (*nni_aio_cancel_fn)(nni_aio *, void *, int);
extern void nni_aio_init(nni_aio *, nni_cb, void *arg);

// nni_aio_fini finalizes an aio object, releasing associated resources.
// It waits for the callback to complete.
// It does NOT wait for the callback to complete, so it is mandatory
// that nni_aio_stop be called first.
extern void nni_aio_fini(nni_aio *);

// nni_aio_reap is used to asynchronously reap the aio. It can
// be called even from the callback of the aio itself.
// Note that this only works with aio objects created
// with nni_aio_alloc.
extern void nni_aio_reap(nni_aio *);

// nni_aio_alloc allocates an aio object and initializes it. The callback
Expand All @@ -42,17 +46,19 @@ extern int nni_aio_alloc(nni_aio **, nni_cb, void *arg);
// nni_aio_free frees the aio, releasing resources (locks)
// associated with it. This is safe to call on zeroed memory.
// This must only be called on an object that was allocated
// with nni_aio_allocate.
// with nni_aio_alloc.
extern void nni_aio_free(nni_aio *aio);

// nni_aio_stop cancels any unfinished I/O, running completion callbacks,
// but also prevents any new operations from starting (nni_aio_start will
// return NNG_ESTATE). This should be called before nni_aio_free(). The
// best pattern is to call nni_aio_stop on all linked aio objects, before
// calling nni_aio_free on any of them. This function will block until any
// callbacks are executed, and therefore it should never be executed
// from a callback itself. (To abort operations without blocking
// use nni_aio_cancel instead.)
// return NNG_ESTATE). This should be called before nni_aio_free(), and MUST
// be called before nni_aio_fini().
//
// The best pattern is to call nni_aio_stop on all linked aio objects, before
// calling nni_aio_free or nni_aio_fini on any of them. This function will
// block until any callbacks are executed, and therefore it should never be
// executed from a callback itself. (To abort operations without blocking use
// nni_aio_cancel instead.)
extern void nni_aio_stop(nni_aio *);

// nni_aio_close closes the aio for further activity. It aborts any in-progress
Expand Down Expand Up @@ -204,16 +210,20 @@ typedef struct nni_aio_expire_q nni_aio_expire_q;
// any of these members -- the definition is provided here to facilitate
// inlining, but that should be the only use.
struct nng_aio {
size_t a_count; // Bytes transferred (I/O only)
nni_time a_expire; // Absolute timeout
nni_duration a_timeout; // Relative timeout
int a_result; // Result code (nng_errno)
bool a_stop; // Shutting down (no new operations)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
bool a_expiring; // Expiration in progress
bool a_use_expire; // Use expire instead of timeout
nni_task a_task;
size_t a_count; // Bytes transferred (I/O only)
nni_time a_expire; // Absolute timeout
nni_duration a_timeout; // Relative timeout
int a_result; // Result code (nng_errno)
int a_abort_result; // Reason for abort (result code)
bool a_stop; // Shutting down (no new operations)
bool a_abort; // Aborted (after begin, before schedule)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
bool a_expiring; // Expiration in progress
bool a_use_expire; // Use expire instead of timeout
bool a_init; // This is initialized

nni_task a_task;

// Read/write operations.
nni_iov a_iov[8];
Expand All @@ -228,14 +238,15 @@ struct nng_aio {
void *a_inputs[4];
void *a_outputs[4];

nni_aio_expire_q *a_expire_q;
nni_list_node a_expire_node; // Expiration node
nni_reap_node a_reap_node;

// Provider-use fields.
nni_aio_cancel_fn a_cancel_fn;
void *a_cancel_arg;
void *a_prov_data;
nni_list_node a_prov_node; // Linkage on provider list.
nni_aio_expire_q *a_expire_q;
nni_list_node a_expire_node; // Expiration node
nni_reap_node a_reap_node;
};

#endif // CORE_AIO_H
4 changes: 2 additions & 2 deletions src/core/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ device_fini(void *arg)
for (int i = 0; i < d->num_paths; i++) {
nni_aio_stop(&d->paths[i].aio);
}
nni_sock_rele(d->paths[0].src);
nni_sock_rele(d->paths[0].dst);
NNI_FREE_STRUCT(d);
}

Expand Down Expand Up @@ -97,8 +99,6 @@ device_cb(void *arg)
nni_aio_finish_error(d->user, d->rv);
d->user = NULL;
}
nni_sock_rele(d->paths[0].src);
nni_sock_rele(d->paths[0].dst);

nni_reap(&device_reap, d);
}
Expand Down
Loading
Loading