Skip to content

Commit

Permalink
udp: implement endpoint stop functions
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 9, 2024
1 parent bd0ea80 commit 8fb78c2
Showing 1 changed file with 43 additions and 26 deletions.
69 changes: 43 additions & 26 deletions src/sp/transport/udp/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1102,31 +1102,6 @@ udp_ep_fini(void *arg)
{
udp_ep *ep = arg;

// We optionally linger a little bit (up to a half second)
// so that the disconnect messages can get pushed out. On
// most systems this should only take a single millisecond.
nni_time linger =
nni_clock() + NNI_SECOND / 2; // half second to drain, max
nni_mtx_lock(&ep->mtx);
ep->fini = true;
while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) {
nni_mtx_unlock(&ep->mtx);
nng_msleep(1);
nni_mtx_lock(&ep->mtx);
}
if (ep->tx_ring.count > 0) {
nng_log_warn("NNG-UDP-LINGER",
"Lingering timed out on endpoint close, peer "
"notifications dropped");
}
nni_mtx_unlock(&ep->mtx);
nni_aio_close(&ep->timeaio);
nni_aio_close(&ep->resaio);
nni_aio_close(&ep->tx_aio);
nni_aio_close(&ep->rx_aio);
if (ep->udp != NULL) {
nni_udp_close(ep->udp);
}
nni_aio_fini(&ep->timeaio);
nni_aio_fini(&ep->resaio);
nni_aio_fini(&ep->tx_aio);
Expand All @@ -1147,14 +1122,54 @@ udp_ep_close(void *arg)
udp_ep *ep = arg;
nni_aio *aio;

nni_aio_close(&ep->resaio);
nni_aio_close(&ep->rx_aio);
nni_aio_close(&ep->timeaio);

// leave tx open so we can send disconnects

nni_mtx_lock(&ep->mtx);
while ((aio = nni_list_first(&ep->connaios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECONNABORTED);
}
nni_mtx_unlock(&ep->mtx);
}

nni_aio_close(&ep->resaio);
static void
udp_ep_stop(void *arg)
{
udp_ep *ep = arg;

nni_aio_stop(&ep->resaio);
nni_aio_stop(&ep->rx_aio);
nni_aio_stop(&ep->timeaio);

// We optionally linger a little bit (up to a half second)
// so that the disconnect messages can get pushed out. On
// most systems this should only take a single millisecond.
nni_time linger =
nni_clock() + NNI_SECOND / 2; // half second to drain, max
nni_mtx_lock(&ep->mtx);
ep->fini = true;
while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) {
nni_mtx_unlock(&ep->mtx);
nng_msleep(1);
nni_mtx_lock(&ep->mtx);
}
if (ep->tx_ring.count > 0) {
nng_log_warn("NNG-UDP-LINGER",
"Lingering timed out on endpoint close, peer "
"notifications dropped");
}
nni_mtx_unlock(&ep->mtx);

// finally close the tx channel
nni_aio_stop(&ep->tx_aio);

if (ep->udp != NULL) {
nni_udp_close(ep->udp);
}
}

// timer handler - sends out additional creqs as needed,
Expand Down Expand Up @@ -1825,6 +1840,7 @@ static nni_sp_dialer_ops udp_dialer_ops = {
.d_fini = udp_ep_fini,
.d_connect = udp_ep_connect,
.d_close = udp_ep_close,
.d_stop = udp_ep_stop,
.d_getopt = udp_dialer_getopt,
.d_setopt = udp_dialer_setopt,
};
Expand All @@ -1836,6 +1852,7 @@ static nni_sp_listener_ops udp_listener_ops = {
.l_bind = udp_ep_bind,
.l_accept = udp_ep_accept,
.l_close = udp_ep_close,
.l_stop = udp_ep_stop,
.l_getopt = udp_listener_getopt,
.l_setopt = udp_listener_setopt,
};
Expand Down

0 comments on commit 8fb78c2

Please sign in to comment.