From f292cc6521f809f523c8fa9e658dfa5422b2b3ab Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 8 Dec 2024 15:44:23 -0800 Subject: [PATCH] ipc transport: convert to using inline data structures --- src/sp/transport/ipc/ipc.c | 267 +++++++++++++++---------------------- 1 file changed, 106 insertions(+), 161 deletions(-) diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c index 803c4b4b4..dadb1909f 100644 --- a/src/sp/transport/ipc/ipc.c +++ b/src/sp/transport/ipc/ipc.c @@ -13,6 +13,7 @@ #include "core/defs.h" #include "core/nng_impl.h" +#include "core/pipe.h" #include "nng/nng.h" // IPC transport. Platform specific IPC operations must be @@ -25,29 +26,27 @@ typedef struct ipc_ep ipc_ep; // ipc_pipe is one end of an IPC connection. struct ipc_pipe { - nng_stream *conn; - uint16_t peer; - uint16_t proto; - size_t rcv_max; - bool closed; - ipc_ep *ep; - nni_pipe *pipe; - nni_list_node node; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t tx_head[1 + sizeof(uint64_t)]; - uint8_t rx_head[1 + sizeof(uint64_t)]; - size_t got_tx_head; - size_t got_rx_head; - size_t want_tx_head; - size_t want_rx_head; - nni_list recv_q; - nni_list send_q; - nni_aio tx_aio; - nni_aio rx_aio; - nni_aio neg_aio; - nni_msg *rx_msg; - nni_mtx mtx; + nng_stream *conn; + uint16_t peer; + uint16_t proto; + size_t rcv_max; + bool closed; + ipc_ep *ep; + nni_pipe *pipe; + nni_list_node node; + uint8_t tx_head[1 + sizeof(uint64_t)]; + uint8_t rx_head[1 + sizeof(uint64_t)]; + size_t got_tx_head; + size_t got_rx_head; + size_t want_tx_head; + size_t want_rx_head; + nni_list recv_q; + nni_list send_q; + nni_aio tx_aio; + nni_aio rx_aio; + nni_aio neg_aio; + nni_msg *rx_msg; + nni_mtx mtx; }; struct ipc_ep { @@ -57,16 +56,15 @@ struct ipc_ep { bool started; bool closed; bool fini; - int ref_cnt; nng_stream_dialer *dialer; nng_stream_listener *listener; + nni_listener *nlistener; + nni_dialer *ndialer; nni_aio *user_aio; - nni_aio *conn_aio; - nni_aio *time_aio; - nni_list busy_pipes; // busy pipes -- ones passed to socket + nni_aio conn_aio; + nni_aio time_aio; nni_list wait_pipes; // pipes waiting to match to socket nni_list nego_pipes; // pipes busy negotiating - nni_reap_node reap; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; #endif @@ -77,18 +75,6 @@ static void ipc_pipe_recv_start(ipc_pipe *p); static void ipc_pipe_send_cb(void *); static void ipc_pipe_recv_cb(void *); static void ipc_pipe_nego_cb(void *); -static void ipc_pipe_fini(void *); -static void ipc_ep_fini(void *); - -static nni_reap_list ipc_ep_reap_list = { - .rl_offset = offsetof(ipc_ep, reap), - .rl_func = ipc_ep_fini, -}; - -static nni_reap_list ipc_pipe_reap_list = { - .rl_offset = offsetof(ipc_pipe, reap), - .rl_func = ipc_pipe_fini, -}; static void ipc_tran_init(void) @@ -119,11 +105,15 @@ ipc_pipe_close(void *arg) static void ipc_pipe_stop(void *arg) { - ipc_pipe *p = arg; + ipc_pipe *p = arg; + ipc_ep *ep = p->ep; nni_aio_stop(&p->rx_aio); nni_aio_stop(&p->tx_aio); nni_aio_stop(&p->neg_aio); + nni_mtx_lock(&ep->mtx); + nni_list_node_remove(&p->node); + nni_mtx_unlock(&ep->mtx); } static int @@ -131,6 +121,12 @@ ipc_pipe_init(void *arg, nni_pipe *pipe) { ipc_pipe *p = arg; p->pipe = pipe; + nni_mtx_init(&p->mtx); + nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p); + nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p); + nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p); + nni_aio_list_init(&p->send_q); + nni_aio_list_init(&p->recv_q); return (0); } @@ -138,18 +134,8 @@ static void ipc_pipe_fini(void *arg) { ipc_pipe *p = arg; - ipc_ep *ep; ipc_pipe_stop(p); - if ((ep = p->ep) != NULL) { - nni_mtx_lock(&ep->mtx); - nni_list_node_remove(&p->node); - ep->ref_cnt--; - if (ep->fini && (ep->ref_cnt == 0)) { - nni_reap(&ipc_ep_reap_list, ep); - } - nni_mtx_unlock(&ep->mtx); - } nng_stream_free(p->conn); nni_aio_fini(&p->rx_aio); nni_aio_fini(&p->tx_aio); @@ -158,34 +144,6 @@ ipc_pipe_fini(void *arg) nni_msg_free(p->rx_msg); } nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static void -ipc_pipe_reap(ipc_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - nni_reap(&ipc_pipe_reap_list, p); - } -} - -static int -ipc_pipe_alloc(ipc_pipe **pipe_p) -{ - ipc_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p); - nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p); - nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p); - nni_aio_list_init(&p->send_q); - nni_aio_list_init(&p->recv_q); - nni_atomic_flag_reset(&p->reaped); - *pipe_p = p; - return (0); } static void @@ -199,10 +157,9 @@ ipc_ep_match(ipc_ep *ep) return; } nni_list_remove(&ep->wait_pipes, p); - nni_list_append(&ep->busy_pipes, p); ep->user_aio = NULL; p->rcv_max = ep->rcv_max; - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->pipe); nni_aio_finish(aio, 0, 0); } @@ -282,7 +239,8 @@ ipc_pipe_nego_cb(void *arg) nni_aio_finish_error(user_aio, rv); } nni_mtx_unlock(&ep->mtx); - ipc_pipe_reap(p); + nni_pipe_close(p->pipe); + nni_pipe_rele(p->pipe); } static void @@ -636,8 +594,6 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep) { nni_iov iov; - ep->ref_cnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -668,60 +624,59 @@ ipc_ep_close(void *arg) ipc_ep *ep = arg; ipc_pipe *p; + nni_aio_close(&ep->time_aio); + nni_aio_close(&ep->conn_aio); + nni_mtx_lock(&ep->mtx); ep->closed = true; - nni_aio_close(ep->time_aio); if (ep->dialer != NULL) { nng_stream_dialer_close(ep->dialer); } if (ep->listener != NULL) { nng_stream_listener_close(ep->listener); } - NNI_LIST_FOREACH (&ep->nego_pipes, p) { - ipc_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->wait_pipes, p) { - ipc_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->busy_pipes, p) { - ipc_pipe_close(p); - } if (ep->user_aio != NULL) { nni_aio_finish_error(ep->user_aio, NNG_ECLOSED); ep->user_aio = NULL; } + NNI_LIST_FOREACH (&ep->nego_pipes, p) { + nni_pipe_close(p->pipe); + } + NNI_LIST_FOREACH (&ep->wait_pipes, p) { + nni_pipe_close(p->pipe); + } nni_mtx_unlock(&ep->mtx); } +static void +ipc_ep_stop(void *arg) +{ + ipc_ep *ep = arg; + + nni_aio_stop(&ep->time_aio); + nni_aio_stop(&ep->conn_aio); +} + static void ipc_ep_fini(void *arg) { ipc_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->ref_cnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - nni_mtx_unlock(&ep->mtx); - nni_aio_stop(ep->time_aio); - nni_aio_stop(ep->conn_aio); + nni_aio_fini(&ep->time_aio); + nni_aio_fini(&ep->conn_aio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_free(ep->time_aio); - nni_aio_free(ep->conn_aio); nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); } static void ipc_ep_timer_cb(void *arg) { ipc_ep *ep = arg; + nni_mtx_lock(&ep->mtx); - if (nni_aio_result(ep->time_aio) == 0) { - nng_stream_listener_accept(ep->listener, ep->conn_aio); + if (nni_aio_result(&ep->time_aio) == 0) { + nng_stream_listener_accept(ep->listener, &ep->conn_aio); } nni_mtx_unlock(&ep->mtx); } @@ -730,7 +685,7 @@ static void ipc_ep_accept_cb(void *arg) { ipc_ep *ep = arg; - nni_aio *aio = ep->conn_aio; + nni_aio *aio = &ep->conn_aio; ipc_pipe *p; int rv; nng_stream *conn; @@ -741,18 +696,21 @@ ipc_ep_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = ipc_pipe_alloc(&p)) != 0) { + + if (ep->closed) { + rv = NNG_ECLOSED; nng_stream_free(conn); goto error; } - if (ep->closed) { - ipc_pipe_fini(p); + rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener); + if (rv != 0) { nng_stream_free(conn); - rv = NNG_ECLOSED; goto error; } + ipc_pipe_start(p, conn, ep); - nng_stream_listener_accept(ep->listener, ep->conn_aio); + + nng_stream_listener_accept(ep->listener, &ep->conn_aio); nni_mtx_unlock(&ep->mtx); return; @@ -765,16 +723,15 @@ ipc_ep_accept_cb(void *arg) } switch (rv) { - + case NNG_ECLOSED: + break; case NNG_ENOMEM: case NNG_ENOFILES: - nng_sleep_aio(10, ep->time_aio); + nng_sleep_aio(10, &ep->time_aio); break; default: - if (!ep->closed) { - nng_stream_listener_accept(ep->listener, ep->conn_aio); - } + nng_stream_listener_accept(ep->listener, &ep->conn_aio); break; } nni_mtx_unlock(&ep->mtx); @@ -784,56 +741,51 @@ static void ipc_ep_dial_cb(void *arg) { ipc_ep *ep = arg; - nni_aio *aio = ep->conn_aio; + nni_aio *aio = &ep->conn_aio; + nni_aio *uaio; ipc_pipe *p; int rv; nng_stream *conn; + nni_mtx_lock(&ep->mtx); if ((rv = nni_aio_result(aio)) != 0) { goto error; } conn = nni_aio_get_output(aio, 0); - if ((rv = ipc_pipe_alloc(&p)) != 0) { + + if (ep->closed) { nng_stream_free(conn); + rv = NNG_ECLOSED; goto error; } - nni_mtx_lock(&ep->mtx); - if (ep->closed) { - ipc_pipe_fini(p); + if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) { nng_stream_free(conn); - rv = NNG_ECLOSED; - nni_mtx_unlock(&ep->mtx); goto error; - } else { - ipc_pipe_start(p, conn, ep); } + + ipc_pipe_start(p, conn, ep); nni_mtx_unlock(&ep->mtx); return; error: // Error connecting. We need to pass this straight back // to the user. - nni_mtx_lock(&ep->mtx); - if ((aio = ep->user_aio) != NULL) { + if ((uaio = ep->user_aio) != NULL) { ep->user_aio = NULL; - nni_aio_finish_error(aio, rv); + nni_aio_finish_error(uaio, rv); } nni_mtx_unlock(&ep->mtx); } -static int -ipc_ep_init(ipc_ep **epp, nni_sock *sock) +static void +ipc_ep_init(ipc_ep *ep, nni_sock *sock, void (*conn_cb)(void *)) { - ipc_ep *ep; - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node); NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node); NNI_LIST_INIT(&ep->nego_pipes, ipc_pipe, node); + nni_aio_init(&ep->conn_aio, conn_cb, ep); + nni_aio_init(&ep->time_aio, ipc_ep_timer_cb, ep); ep->proto = nni_sock_proto_id(sock); @@ -847,56 +799,44 @@ ipc_ep_init(ipc_ep **epp, nni_sock *sock) }; nni_stat_init(&ep->st_rcv_max, &rcv_max_info); #endif - - *epp = ep; - return (0); } static int ipc_ep_init_dialer(void **dp, nng_url *url, nni_dialer *dialer) { - ipc_ep *ep; + ipc_ep *ep = (void *) dp; int rv; nni_sock *sock = nni_dialer_sock(dialer); - if ((rv = ipc_ep_init(&ep, sock)) != 0) { - return (rv); - } + ipc_ep_init(ep, sock, ipc_ep_dial_cb); + ep->ndialer = dialer; - if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_dial_cb, ep)) != 0) || - ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) { - ipc_ep_fini(ep); + if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) { return (rv); } #ifdef NNG_ENABLE_STATS nni_dialer_add_stat(dialer, &ep->st_rcv_max); #endif - *dp = ep; return (0); } static int ipc_ep_init_listener(void **dp, nng_url *url, nni_listener *listener) { - ipc_ep *ep; + ipc_ep *ep = (void *) dp; int rv; nni_sock *sock = nni_listener_sock(listener); - if ((rv = ipc_ep_init(&ep, sock)) != 0) { - return (rv); - } + ipc_ep_init(ep, sock, ipc_ep_accept_cb); + ep->nlistener = listener; - if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_accept_cb, ep)) != 0) || - ((rv = nni_aio_alloc(&ep->time_aio, ipc_ep_timer_cb, ep)) != 0) || - ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { - ipc_ep_fini(ep); + if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) { return (rv); } #ifdef NNG_ENABLE_STATS nni_listener_add_stat(listener, &ep->st_rcv_max); #endif - *dp = ep; return (0); } @@ -939,7 +879,7 @@ ipc_ep_connect(void *arg, nni_aio *aio) return; } ep->user_aio = aio; - nng_stream_dialer_dial(ep->dialer, ep->conn_aio); + nng_stream_dialer_dial(ep->dialer, &ep->conn_aio); nni_mtx_unlock(&ep->mtx); } @@ -1013,7 +953,7 @@ ipc_ep_accept(void *arg, nni_aio *aio) ep->user_aio = aio; if (!ep->started) { ep->started = true; - nng_stream_listener_accept(ep->listener, ep->conn_aio); + nng_stream_listener_accept(ep->listener, &ep->conn_aio); } else { ipc_ep_match(ep); } @@ -1030,6 +970,7 @@ ipc_pipe_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) } static nni_sp_pipe_ops ipc_tran_pipe_ops = { + .p_size = sizeof(ipc_pipe), .p_init = ipc_pipe_init, .p_fini = ipc_pipe_fini, .p_stop = ipc_pipe_stop, @@ -1117,20 +1058,24 @@ ipc_listener_set_sec_desc(void *arg, void *pdesc) } static nni_sp_dialer_ops ipc_dialer_ops = { + .d_size = sizeof(ipc_ep), .d_init = ipc_ep_init_dialer, .d_fini = ipc_ep_fini, .d_connect = ipc_ep_connect, .d_close = ipc_ep_close, + .d_stop = ipc_ep_stop, .d_getopt = ipc_dialer_get, .d_setopt = ipc_dialer_set, }; static nni_sp_listener_ops ipc_listener_ops = { + .l_size = sizeof(ipc_ep), .l_init = ipc_ep_init_listener, .l_fini = ipc_ep_fini, .l_bind = ipc_ep_bind, .l_accept = ipc_ep_accept, .l_close = ipc_ep_close, + .l_stop = ipc_ep_stop, .l_getopt = ipc_listener_get, .l_setopt = ipc_listener_set, .l_set_security_descriptor = ipc_listener_set_sec_desc,