diff --git a/src/core/aio.c b/src/core/aio.c index 5807869b8..bb8347dd7 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -780,15 +780,21 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) } } -static void +static bool nni_aio_expire_q_stop(nni_aio_expire_q *eq) { - if (eq != NULL && !eq->eq_stop) { + bool result = false; + if (eq != NULL) { nni_mtx_lock(&eq->eq_mtx); eq->eq_stop = true; nni_cv_wake(&eq->eq_cv); + while (!nni_list_empty(&eq->eq_list)) { + result = true; + nni_cv_wait(&eq->eq_cv); + } nni_mtx_unlock(&eq->eq_mtx); } + return (result); } static void @@ -834,12 +840,16 @@ nni_aio_expire_q_alloc(void) return (eq); } -void -nni_aio_sys_stop(void) +bool +nni_aio_sys_drain(void) { + bool result = false; for (int i = 0; i < nni_aio_expire_q_cnt; i++) { - nni_aio_expire_q_stop(nni_aio_expire_q_list[i]); + if (nni_aio_expire_q_stop(nni_aio_expire_q_list[i])) { + result = true; + } } + return (result); } void diff --git a/src/core/aio.h b/src/core/aio.h index f8c6730f0..f56d2f587 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -193,7 +193,7 @@ extern void nni_aio_completions_add( nni_aio_completions *, nni_aio *, int, size_t); extern int nni_aio_sys_init(nng_init_params *); -extern void nni_aio_sys_stop(void); +extern bool nni_aio_sys_drain(void); extern void nni_aio_sys_fini(void); typedef struct nni_aio_expire_q nni_aio_expire_q; diff --git a/src/core/init.c b/src/core/init.c index fa07919e9..4b31995a3 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -133,13 +133,18 @@ nng_fini(void) nni_atomic_flag_reset(&init_busy); return; } - nni_aio_sys_stop(); // no more scheduling allowed! nni_sock_closeall(); nni_sp_tran_sys_fini(); + + // Drain everything. This is important because some of + // these subsystems can dispatch things to other ones. + // So we need them *all* to be empty before proceeding. + while ((nni_aio_sys_drain() || nni_taskq_sys_drain() || + nni_reap_sys_drain())) { + continue; + } nni_tls_sys_fini(); - nni_reap_drain(); nni_taskq_sys_fini(); - nni_reap_drain(); nni_aio_sys_fini(); nni_id_map_sys_fini(); nni_reap_sys_fini(); // must be near the end diff --git a/src/core/reap.c b/src/core/reap.c index 3f182205c..f996695dc 100644 --- a/src/core/reap.c +++ b/src/core/reap.c @@ -18,9 +18,9 @@ static nni_reap_list *reap_list = NULL; static nni_thr reap_thr; static bool reap_exit = false; -static nni_mtx reap_mtx = NNI_MTX_INITIALIZER; +static nni_mtx reap_mtx = NNI_MTX_INITIALIZER; static bool reap_empty; -static nni_cv reap_work_cv = NNI_CV_INITIALIZER(&reap_mtx); +static nni_cv reap_work_cv = NNI_CV_INITIALIZER(&reap_mtx); static nni_cv reap_empty_cv = NNI_CV_INITIALIZER(&reap_mtx); static void @@ -90,14 +90,17 @@ nni_reap(nni_reap_list *rl, void *item) nni_mtx_unlock(&reap_mtx); } -void -nni_reap_drain(void) +bool +nni_reap_sys_drain(void) { + bool result = false; nni_mtx_lock(&reap_mtx); while (!reap_empty) { + result = true; nni_cv_wait(&reap_empty_cv); } nni_mtx_unlock(&reap_mtx); + return (result); } int diff --git a/src/core/reap.h b/src/core/reap.h index 5f6318859..221fd17f0 100644 --- a/src/core/reap.h +++ b/src/core/reap.h @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -56,8 +56,9 @@ struct nni_reap_list { extern void nni_reap(nni_reap_list *, void *); -// nni_reap_drain waits for the reap queue to be drained. -extern void nni_reap_drain(void); +// nni_reap_sys_drain waits for the reap queue to be drained. +// It returns true if it found anything to wait for. +extern bool nni_reap_sys_drain(void); extern int nni_reap_sys_init(void); extern void nni_reap_sys_fini(void); diff --git a/src/core/taskq.c b/src/core/taskq.c index 496c2fabe..1f0ae1b6b 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -59,6 +59,7 @@ nni_taskq_thread(void *self) continue; } + nni_cv_wake(&tq->tq_wait_cv); if (!tq->tq_run) { break; } @@ -127,6 +128,19 @@ nni_taskq_fini(nni_taskq *tq) NNI_FREE_STRUCT(tq); } +bool +nni_taskq_drain(nni_taskq *tq) +{ + bool result = false; + nni_mtx_lock(&tq->tq_mtx); + while (!nni_list_empty(&tq->tq_tasks)) { + result = true; + nni_cv_wait(&tq->tq_wait_cv); + } + nni_mtx_unlock(&tq->tq_mtx); + return (result); +} + void nni_task_exec(nni_task *task) { @@ -263,6 +277,12 @@ nni_taskq_sys_init(nng_init_params *params) return (nni_taskq_init(&nni_taskq_systq, (int) num_thr)); } +bool +nni_taskq_sys_drain(void) +{ + return (nni_taskq_drain(nni_taskq_systq)); +} + void nni_taskq_sys_fini(void) { diff --git a/src/core/taskq.h b/src/core/taskq.h index 498b4f374..d32994334 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -62,6 +62,7 @@ extern void nni_task_init(nni_task *, nni_taskq *, nni_cb, void *); extern void nni_task_fini(nni_task *); extern int nni_taskq_sys_init(nng_init_params *); +extern bool nni_taskq_sys_drain(void); extern void nni_taskq_sys_fini(void); // nni_task implementation details are not to be used except by the diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c index a09d120c4..cb826308e 100644 --- a/src/platform/windows/win_ipcdial.c +++ b/src/platform/windows/win_ipcdial.c @@ -295,7 +295,7 @@ nni_win_ipc_sysfini(void) { ipc_dial_work *worker = &ipc_connector; - nni_reap_drain(); // so that listeners get cleaned up. + nni_reap_sys_drain(); // so that listeners get cleaned up. nni_mtx_lock(&worker->mtx); worker->exit = 1;