Skip to content

Commit

Permalink
aio: separate stop / shutdown from fini (deallocate)
Browse files Browse the repository at this point in the history
Probably other subsystems should get the same treatment.  We need
to basically start the process of shutting down so that subsystems
know to cease operation before we rip memory out from underneath them.

This ensures that no new operations can be started as well, once we
have begun the process of teardown.

We also enhanced the completion of sleep to avoid some extra locking
contention, since the expiration *is* the completion.

Includes a test for this case.
  • Loading branch information
gdamore committed Dec 7, 2024
1 parent a02b1c7 commit 8fa3b2a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 14 deletions.
56 changes: 44 additions & 12 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//

#include "core/nng_impl.h"
#include "core/taskq.h"
#include <string.h>

struct nni_aio_expire_q {
Expand All @@ -18,6 +19,7 @@ struct nni_aio_expire_q {
nni_thr eq_thr;
nni_time eq_next; // next expiration
bool eq_exit;
bool eq_stop;
};

static nni_aio_expire_q **nni_aio_expire_q_list;
Expand Down Expand Up @@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;

// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_exit) {
if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
Expand Down Expand Up @@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}

nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_exit) {
if (aio->a_stop || eq->eq_stop) {
nni_task_abort(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECLOSED);
Expand Down Expand Up @@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg)
nni_mtx_unlock(mtx);
return;
}
if (now < next) {
// Early wake up (just to reschedule), no need to
// rescan the list. This is an optimization.
if (now < next && !(q->eq_stop && aio != NULL)) {
// nothing to do!
nni_cv_until(cv, next);
continue;
}
q->eq_next = NNI_TIME_NEVER;
exp_idx = 0;
while (aio != NULL) {
if ((aio->a_expire < now) &&
if ((q->eq_stop || aio->a_expire < now) &&
(exp_idx < NNI_EXPIRE_BATCH)) {
nni_aio *nxt;

Expand All @@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg)

for (uint32_t i = 0; i < exp_idx; i++) {
aio = expires[i];
rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
if (q->eq_stop) {
rv = NNG_ECANCELED;
} else if (aio->a_expire_ok) {
aio->a_expire_ok = false;
rv = 0;
} else {
rv = NNG_ETIMEDOUT;
}

nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
void *cancel_arg = aio->a_cancel_arg;
Expand All @@ -639,18 +647,22 @@ nni_aio_expire_loop(void *arg)
// If there is no cancellation function, then we cannot
// terminate the aio - we've tried, but it has to run
// to its natural conclusion.
if (cancel_fn != NULL) {
//
// For the special case of sleeping, we don't need to
// drop the lock and call the cancel function, we are
// already doing it right here!
if (aio->a_sleep) {
aio->a_result = rv;
aio->a_sleep = false;
nni_task_dispatch(&aio->a_task);
} else if (cancel_fn != NULL) {
nni_mtx_unlock(mtx);
cancel_fn(aio, cancel_arg, rv);
nni_mtx_lock(mtx);
}
aio->a_expiring = false;
}
nni_cv_wake(cv);

if (now < q->eq_next) {
nni_cv_until(cv, q->eq_next);
}
}
}

Expand Down Expand Up @@ -768,12 +780,24 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
}
}

static void
nni_aio_expire_q_stop(nni_aio_expire_q *eq)
{
if (eq != NULL && !eq->eq_stop) {
nni_mtx_lock(&eq->eq_mtx);
eq->eq_stop = true;
nni_cv_wake(&eq->eq_cv);
nni_mtx_unlock(&eq->eq_mtx);
}
}

static void
nni_aio_expire_q_free(nni_aio_expire_q *eq)
{
if (eq == NULL) {
return;
}
NNI_ASSERT(eq->eq_stop);
if (!eq->eq_exit) {
nni_mtx_lock(&eq->eq_mtx);
eq->eq_exit = true;
Expand Down Expand Up @@ -810,6 +834,14 @@ nni_aio_expire_q_alloc(void)
return (eq);
}

void
nni_aio_sys_stop(void)
{
for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
nni_aio_expire_q_stop(nni_aio_expire_q_list[i]);
}
}

void
nni_aio_sys_fini(void)
{
Expand Down
1 change: 1 addition & 0 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ extern void nni_aio_completions_add(
nni_aio_completions *, nni_aio *, int, size_t);

extern int nni_aio_sys_init(nng_init_params *);
extern void nni_aio_sys_stop(void);
extern void nni_aio_sys_fini(void);

typedef struct nni_aio_expire_q nni_aio_expire_q;
Expand Down
42 changes: 42 additions & 0 deletions src/core/aio_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,46 @@ test_sleep_timeout(void)
nng_aio_free(aio);
}

static void
sleep_reap(void *arg)
{
nng_aio *aio = *(nng_aio **) arg;
if (nng_aio_result(aio) != NNG_ECANCELED) {
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
}
nng_aio_reap(aio);
}

static void
test_sleep_fini(void)
{
static nng_aio *aio;
NUTS_TRUE(nng_aio_alloc(&aio, sleep_reap, &aio) == 0);
nng_sleep_aio(20000, aio);
nng_msleep(1);
// intentionally we do not free the aio here. reap should clean it.
nng_fini();
nng_init(NULL); // so that TEST_FINI will reap
}

static void
test_sleep_fini_many(void)
{
#define NIOS 2000
static nng_aio *aios[NIOS];
for (int i = 0; i < NIOS; i++) {
int rv = nng_aio_alloc(&(aios[i]), sleep_reap, &(aios[i]));
if (rv != 0) {
NUTS_ASSERT(rv == 0);
}
}
for (int i = 0; i < NIOS; i++) {
nng_sleep_aio(20000, aios[i]);
}
nng_fini();
nng_init(NULL);
}

void
test_insane_nio(void)
{
Expand Down Expand Up @@ -400,6 +440,8 @@ test_aio_busy(void)
NUTS_TESTS = {
{ "sleep", test_sleep },
{ "sleep timeout", test_sleep_timeout },
{ "sleep fini", test_sleep_fini },
{ "sleep fini many", test_sleep_fini_many },
{ "insane nio", test_insane_nio },
{ "provider cancel", test_provider_cancel },
{ "consumer cancel", test_consumer_cancel },
Expand Down
6 changes: 4 additions & 2 deletions src/core/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,16 @@ nng_fini(void)
nni_atomic_flag_reset(&init_busy);
return;
}
nni_aio_sys_stop(); // no more scheduling allowed!
nni_sock_closeall();
nni_sp_tran_sys_fini();
nni_tls_sys_fini();
nni_reap_drain();
nni_aio_sys_fini();
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_reap_drain();
nni_aio_sys_fini();
nni_id_map_sys_fini();
nni_reap_sys_fini(); // must be near the end
nni_plat_fini();
nni_atomic_flag_reset(&init_busy);
}

0 comments on commit 8fa3b2a

Please sign in to comment.