From e8ba739b05fd793bcc1850c0e7fb06a28e001d92 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 1 Jan 2024 15:07:00 -0800 Subject: [PATCH] fixes #1572 nng creates too many threads This further limits some of the thread counts, but principally it offers a new runtime facility, nng_init_set_parameter(), which can be used to set certain runtime parameters on the number of threads, provided it is called before the rest of application start up. This facility is quite intentionally "undocumented", at least for now, as we want to limit our commitment to it. Still this should be helpful for applications that need to reduce the number of threads that are created. --- CMakeLists.txt | 22 ++- include/nng/nng.h | 234 +++++++++++++++++++------- src/core/CMakeLists.txt | 3 +- src/core/aio.c | 28 ++- src/core/init.c | 62 ++++++- src/core/init.h | 11 +- src/core/init_test.c | 27 +++ src/core/taskq.c | 31 ++-- src/nng.c | 6 + src/platform/posix/posix_resolv_gai.c | 37 ++-- src/platform/windows/win_io.c | 35 ++-- src/platform/windows/win_resolv.c | 53 +++--- 12 files changed, 415 insertions(+), 134 deletions(-) create mode 100644 src/core/init_test.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 226bb2ebd..7fe8f7129 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2023 Staysail Systems, Inc. +# Copyright 2024 Staysail Systems, Inc. # Copyright (c) 2012 Martin Sustrik All rights reserved. # Copyright (c) 2013 GoPivotal, Inc. All rights reserved. # Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved. @@ -114,17 +114,19 @@ endif () nng_defines_if(NNG_ENABLE_STATS NNG_ENABLE_STATS) +set(NNG_RESOLV_CONCURRENCY 4 CACHE STRING "Resolver (DNS) concurrency.") +mark_as_advanced(NNG_RESOLV_CONCURRENCY) if (NNG_RESOLV_CONCURRENCY) add_definitions(-DNNG_RESOLV_CONCURRENCY=${NNG_RESOLV_CONCURRENCY}) endif () -mark_as_advanced(NNG_RESOLV_CONCURRENCY) +set(NNG_NUM_TASKQ_THREADS 0 CACHE STRING "Fixed number of task threads, 0 for automatic") +mark_as_advanced(NNG_NUM_TASKQ_THREADS) if (NNG_NUM_TASKQ_THREADS) add_definitions(-DNNG_NUM_TASKQ_THREADS=${NNG_NUM_TASKQ_THREADS}) endif () -mark_as_advanced(NNG_NUM_TASKQ_THREADS) -set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on taskq threads, 0 for no limit") +set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on task threads, 0 for no limit") mark_as_advanced(NNG_MAX_TASKQ_THREADS) if (NNG_MAX_TASKQ_THREADS) add_definitions(-DNNG_MAX_TASKQ_THREADS=${NNG_MAX_TASKQ_THREADS}) @@ -132,6 +134,12 @@ endif () # Expire threads. This runs the timeout handling, and having more of them # reduces contention on the common locks used for aio expiration. +set(NNG_NUM_EXPIRE_THREADS 0 CACHE STRING "Fixed number of expire threads, 0 for automatic") +mark_as_advanced(NNG_NUM_EXPIRE_THREADS) +if (NNG_NUM_EXPIRE_THREADS) + add_definitions(-DNNG_NUM_EXPIRE_THREADS=${NNG_NUM_EXPIRE_THREADS}) +endif () + set(NNG_MAX_EXPIRE_THREADS 8 CACHE STRING "Upper bound on expire threads, 0 for no limit") mark_as_advanced(NNG_MAX_EXPIRE_THREADS) if (NNG_MAX_EXPIRE_THREADS) @@ -140,6 +148,12 @@ endif() # Poller threads. These threads run the pollers. This is mostly used # on Windows right now, as the POSIX platforms use a single threaded poller. +set(NNG_NUM_POLLER_THREADS 0 CACHE STRING "Fixed number of I/O poller threads, 0 for automatic") +if (NNG_NUM_POLLER_THREADS) + add_definitions(-DNNG_NUM_POLLER_THREADS=${NNG_NUM_POLLER_THREADS}) +endif () +mark_as_advanced(NNG_NUM_POLLER_THREADS) + set(NNG_MAX_POLLER_THREADS 8 CACHE STRING "Upper bound on I/O poller threads, 0 for no limit") mark_as_advanced(NNG_MAX_POLLER_THREADS) if (NNG_MAX_POLLER_THREADS) diff --git a/include/nng/nng.h b/include/nng/nng.h index 98a9a8438..394dd0fd1 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -47,7 +47,7 @@ extern "C" { #ifndef NNG_DEPRECATED #if defined(__GNUC__) || defined(__clang__) -#define NNG_DEPRECATED __attribute__ ((deprecated)) +#define NNG_DEPRECATED __attribute__((deprecated)) #else #define NNG_DEPRECATED #endif @@ -59,7 +59,8 @@ extern "C" { #define NNG_MAJOR_VERSION 1 #define NNG_MINOR_VERSION 7 #define NNG_PATCH_VERSION 0 -#define NNG_RELEASE_SUFFIX "pre" // if non-empty (i.e. "pre"), this is a pre-release +#define NNG_RELEASE_SUFFIX \ + "pre" // if non-empty (i.e. "pre"), this is a pre-release // Maximum length of a socket address. This includes the terminating NUL. // This limit is built into other implementations, so do not change it. @@ -71,7 +72,7 @@ extern "C" { // NNG_PROTOCOL_NUMBER is used by protocol headers to calculate their // protocol number from a major and minor number. Applications should // probably not need to use this. -#define NNG_PROTOCOL_NUMBER(maj, min) (((x) *16) + (y)) +#define NNG_PROTOCOL_NUMBER(maj, min) (((x) * 16) + (y)) // Types common to nng. @@ -101,7 +102,7 @@ typedef struct nng_socket_s { uint32_t id; } nng_socket; -typedef int32_t nng_duration; // in milliseconds +typedef int32_t nng_duration; // in milliseconds // nng_time represents an absolute time since some arbitrary point in the // past, measured in milliseconds. The values are always positive. @@ -199,7 +200,7 @@ enum nng_sockaddr_family { // Scatter/gather I/O. typedef struct nng_iov { - void * iov_buf; + void *iov_buf; size_t iov_len; } nng_iov; @@ -616,7 +617,7 @@ NNG_DECL void nng_aio_finish(nng_aio *, int); // final argument is passed to the cancelfn. The final argument of the // cancellation function is the error number (will not be zero) corresponding // to the reason for cancellation, e.g. NNG_ETIMEDOUT or NNG_ECANCELED. -typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int); +typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int); NNG_DECL void nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *); // nng_aio_sleep does a "sleeping" operation, basically does nothing @@ -630,9 +631,9 @@ NNG_DECL void nng_msg_free(nng_msg *); NNG_DECL int nng_msg_realloc(nng_msg *, size_t); NNG_DECL int nng_msg_reserve(nng_msg *, size_t); NNG_DECL size_t nng_msg_capacity(nng_msg *); -NNG_DECL void * nng_msg_header(nng_msg *); +NNG_DECL void *nng_msg_header(nng_msg *); NNG_DECL size_t nng_msg_header_len(const nng_msg *); -NNG_DECL void * nng_msg_body(nng_msg *); +NNG_DECL void *nng_msg_body(nng_msg *); NNG_DECL size_t nng_msg_len(const nng_msg *); NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t); NNG_DECL int nng_msg_insert(nng_msg *, const void *, size_t); @@ -693,7 +694,7 @@ NNG_DECL nng_dialer nng_pipe_dialer(nng_pipe); NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // Flags. -#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer +#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer #define NNG_FLAG_NONBLOCK 2u // Non-blocking operations // Options. @@ -1250,7 +1251,6 @@ NNG_DECL int nng_stream_listener_set_ptr( NNG_DECL int nng_stream_listener_set_addr( nng_stream_listener *, const char *, const nng_sockaddr *); - #ifndef NNG_ELIDE_DEPRECATED // These are legacy APIs that have been deprecated. // Their use is strongly discouraged. @@ -1260,95 +1260,205 @@ NNG_DECL int nng_stream_listener_set_addr( NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *) NNG_DEPRECATED; // Socket options. Use nng_socket_get and nng_socket_set instead. -NNG_DECL int nng_getopt(nng_socket, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_getopt( + nng_socket, const char *, void *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_getopt_bool(nng_socket, const char *, bool *) NNG_DEPRECATED; NNG_DECL int nng_getopt_int(nng_socket, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_getopt_ms(nng_socket, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_getopt_size(nng_socket, const char *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_getopt_uint64(nng_socket, const char *, uint64_t *) NNG_DEPRECATED; +NNG_DECL int nng_getopt_ms( + nng_socket, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_getopt_size( + nng_socket, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_getopt_uint64( + nng_socket, const char *, uint64_t *) NNG_DEPRECATED; NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_getopt_string(nng_socket, const char *, char **) NNG_DEPRECATED; -NNG_DECL int nng_setopt(nng_socket, const char *, const void *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_getopt_string( + nng_socket, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_setopt( + nng_socket, const char *, const void *, size_t) NNG_DEPRECATED; NNG_DECL int nng_setopt_bool(nng_socket, const char *, bool) NNG_DEPRECATED; NNG_DECL int nng_setopt_int(nng_socket, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_setopt_ms(nng_socket, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_setopt_ms( + nng_socket, const char *, nng_duration) NNG_DEPRECATED; NNG_DECL int nng_setopt_size(nng_socket, const char *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_setopt_uint64(nng_socket, const char *, uint64_t) NNG_DEPRECATED; -NNG_DECL int nng_setopt_string(nng_socket, const char *, const char *) NNG_DEPRECATED; +NNG_DECL int nng_setopt_uint64( + nng_socket, const char *, uint64_t) NNG_DEPRECATED; +NNG_DECL int nng_setopt_string( + nng_socket, const char *, const char *) NNG_DEPRECATED; NNG_DECL int nng_setopt_ptr(nng_socket, const char *, void *) NNG_DEPRECATED; // Context options. Use nng_ctx_get and nng_ctx_set instead. -NNG_DECL int nng_ctx_getopt(nng_ctx, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_ctx_getopt( + nng_ctx, const char *, void *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_ctx_getopt_bool(nng_ctx, const char *, bool *) NNG_DEPRECATED; NNG_DECL int nng_ctx_getopt_int(nng_ctx, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_ctx_getopt_ms(nng_ctx, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_ctx_getopt_size(nng_ctx, const char *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_ctx_setopt(nng_ctx, const char *, const void *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_ctx_getopt_ms( + nng_ctx, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_ctx_getopt_size( + nng_ctx, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_ctx_setopt( + nng_ctx, const char *, const void *, size_t) NNG_DEPRECATED; NNG_DECL int nng_ctx_setopt_bool(nng_ctx, const char *, bool) NNG_DEPRECATED; NNG_DECL int nng_ctx_setopt_int(nng_ctx, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_ctx_setopt_ms(nng_ctx, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_ctx_setopt_ms( + nng_ctx, const char *, nng_duration) NNG_DEPRECATED; NNG_DECL int nng_ctx_setopt_size(nng_ctx, const char *, size_t) NNG_DEPRECATED; // Dialer options. Use nng_dialer_get and nng_dialer_set instead. -NNG_DECL int nng_dialer_getopt(nng_dialer, const char *, void *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_bool(nng_dialer, const char *, bool *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_int(nng_dialer, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_ms(nng_dialer, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_size(nng_dialer, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt( + nng_dialer, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_bool( + nng_dialer, const char *, bool *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_int( + nng_dialer, const char *, int *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_ms( + nng_dialer, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_size( + nng_dialer, const char *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_dialer_getopt_sockaddr( nng_dialer, const char *, nng_sockaddr *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_uint64(nng_dialer, const char *, uint64_t *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_ptr(nng_dialer, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_string(nng_dialer, const char *, char **) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt(nng_dialer, const char *, const void *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_bool(nng_dialer, const char *, bool) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_int(nng_dialer, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_ms(nng_dialer, const char *, nng_duration) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_size(nng_dialer, const char *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_uint64(nng_dialer, const char *, uint64_t) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_ptr(nng_dialer, const char *, void *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_string(nng_dialer, const char *, const char *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_uint64( + nng_dialer, const char *, uint64_t *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_ptr( + nng_dialer, const char *, void **) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_string( + nng_dialer, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt( + nng_dialer, const char *, const void *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_bool( + nng_dialer, const char *, bool) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_int( + nng_dialer, const char *, int) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_ms( + nng_dialer, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_size( + nng_dialer, const char *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_uint64( + nng_dialer, const char *, uint64_t) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_ptr( + nng_dialer, const char *, void *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_string( + nng_dialer, const char *, const char *) NNG_DEPRECATED; // Listener options. Use nng_listener_get and nng_listener_set instead. -NNG_DECL int nng_listener_getopt(nng_listener, const char *, void *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_bool(nng_listener, const char *, bool *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_int(nng_listener, const char *, int *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt( + nng_listener, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_bool( + nng_listener, const char *, bool *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_int( + nng_listener, const char *, int *) NNG_DEPRECATED; NNG_DECL int nng_listener_getopt_ms( nng_listener, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_size(nng_listener, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_size( + nng_listener, const char *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_listener_getopt_sockaddr( nng_listener, const char *, nng_sockaddr *) NNG_DEPRECATED; NNG_DECL int nng_listener_getopt_uint64( nng_listener, const char *, uint64_t *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_ptr(nng_listener, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_string(nng_listener, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_ptr( + nng_listener, const char *, void **) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_string( + nng_listener, const char *, char **) NNG_DEPRECATED; NNG_DECL int nng_listener_setopt( nng_listener, const char *, const void *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_bool(nng_listener, const char *, bool) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_int(nng_listener, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_ms(nng_listener, const char *, nng_duration) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_size(nng_listener, const char *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_uint64(nng_listener, const char *, uint64_t) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_ptr(nng_listener, const char *, void *) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_bool( + nng_listener, const char *, bool) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_int( + nng_listener, const char *, int) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_ms( + nng_listener, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_size( + nng_listener, const char *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_uint64( + nng_listener, const char *, uint64_t) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_ptr( + nng_listener, const char *, void *) NNG_DEPRECATED; NNG_DECL int nng_listener_setopt_string( nng_listener, const char *, const char *) NNG_DEPRECATED; // Pipe options. Use nng_pipe_get instead. -NNG_DECL int nng_pipe_getopt(nng_pipe, const char *, void *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_bool(nng_pipe, const char *, bool *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt( + nng_pipe, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_bool( + nng_pipe, const char *, bool *) NNG_DEPRECATED; NNG_DECL int nng_pipe_getopt_int(nng_pipe, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_ms(nng_pipe, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_size(nng_pipe, const char *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_sockaddr(nng_pipe, const char *, nng_sockaddr *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_uint64(nng_pipe, const char *, uint64_t *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_ptr(nng_pipe, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_string(nng_pipe, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_ms( + nng_pipe, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_size( + nng_pipe, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_sockaddr( + nng_pipe, const char *, nng_sockaddr *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_uint64( + nng_pipe, const char *, uint64_t *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_ptr( + nng_pipe, const char *, void **) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_string( + nng_pipe, const char *, char **) NNG_DEPRECATED; // nng_closeall closes all open sockets. Do not call this from // a library; it will affect all sockets. NNG_DECL void nng_closeall(void) NNG_DEPRECATED; -#endif +#endif // NNG_ELIDE_DEPRECATED + +// nng_init_parameter is used by applications to change a tunable setting. +// This function must be called before any other NNG function for the setting +// to have any effect. This function is also not thread-safe! +// +// The list of parameters supported is *not* documented, and subject to change. +// +// We try to provide sane defaults, so the use here is intended to provide +// more control for applications that cannot use compile-time configuration. +// +// Applications should not depend on this API for correct operation. +// +// This API is intentionally undocumented. +// +// Parameter settings are lost after nng_fini() is called. +typedef int nng_init_parameter; +NNG_DECL void nng_init_set_parameter(nng_init_parameter, uint64_t); + +// The following list of parameters is not part of our API stability promise. +// In particular the set of parameters that are supported, the default values, +// the range of valid values, and semantics associated therein are subject to +// change at any time. We won't go out of our way to break these, and we will +// try to prevent changes here from breaking working applications, but this is +// on a best effort basis only. +// +// NOTE: When removing a value, please leave the enumeration in place and add +// a suffix _RETIRED ... this will preserve the binary values for binary compatibility. +enum { + NNG_INIT_PARAMETER_NONE = 0, // ensure values start at 1. + + // Fix the number of threads used for tasks (callbacks), + // Default is 2 threads per core, capped to NNG_INIT_MAX_TASK_THREADS. + // At least 2 threads will be created in any case. + NNG_INIT_NUM_TASK_THREADS, + + // Fix the number of threads used for expiration. Default is one thread per + // core, capped to NNG_INIT_MAX_EXPIRE_THREADS. At least one thread will be created. + NNG_INIT_NUM_EXPIRE_THREADS, + + // Fix the number of poller threads (used for I/O). Support varies + // by platform (many platforms only support a single poller thread.) + NNG_INIT_NUM_POLLER_THREADS, + + // Fix the number of threads used for DNS resolution. At least one will be used. + // Default is controlled by NNG_RESOLV_CONCURRENCY compile time variable. + NNG_INIT_NUM_RESOLVER_THREADS, + + // Limit the number of threads of created for tasks. + // NNG will always create at least 2 of these in order to prevent deadlocks. + // Zero means no limit. Default is determined by NNG_MAX_TASKQ_THREADS compile time variable. + NNG_INIT_MAX_TASK_THREADS, + + // Limit the number of threads created for expiration. Zero means no limit. + // Default is determined by the NNG_MAX_EXPIRE_THREADS compile time variable. + NNG_INIT_MAX_EXPIRE_THREADS, + + // Limit the number of poller/IO threads created. Zero means no limit. + // Default is determined by NNG_MAX_POLLER_THREADS compile time variable. + NNG_INIT_MAX_POLLER_THREADS, +}; #ifdef __cplusplus } diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 9e5a6bec2..009d6bb0c 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2021 Staysail Systems, Inc. +# Copyright 2024 Staysail Systems, Inc. # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -78,6 +78,7 @@ nng_test(aio_test) nng_test(buf_size_test) nng_test(errors_test) nng_test(id_test) +nng_test(init_test) nng_test(list_test) nng_test(message_test) nng_test(reconnect_test) diff --git a/src/core/aio.c b/src/core/aio.c index 3d4a56c19..03ddc33bb 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -843,18 +843,28 @@ int nni_aio_sys_init(void) { int num_thr; + int max_thr; -#ifndef NNG_NUM_EXPIRE_THREADS - num_thr = nni_plat_ncpu(); -#else - num_thr = NNG_NUM_EXPIRE_THREADS; +#ifndef NNG_MAX_EXPIRE_THREADS +#define NNG_MAX_EXPIRE_THREADS 8 #endif -#if NNG_MAX_EXPIRE_THREADS > 0 - if (num_thr > NNG_MAX_EXPIRE_THREADS) { - num_thr = NNG_MAX_EXPIRE_THREADS; - } + +#ifndef NNG_NUM_EXPIRE_THREADS +#define NNG_NUM_EXPIRE_THREADS nni_plat_ncpu() #endif + max_thr = (int) nni_init_get_param( + NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS); + + num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS); + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 0) { + num_thr = 1; + } nni_aio_expire_q_list = nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr); nni_aio_expire_q_cnt = num_thr; diff --git a/src/core/init.c b/src/core/init.c index f2195bcb4..1e4e238b8 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -54,10 +54,69 @@ nni_init(void) return (nni_plat_init(nni_init_helper)); } +// accessing the list of parameters +typedef struct nni_init_param { + nni_list_node node; + nng_init_parameter param; + uint64_t value; +} nni_init_param; + +static nni_list nni_init_params = + NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node); + +void +nni_init_set_param(nng_init_parameter p, uint64_t value) +{ + if (nni_inited) { + // this is paranoia -- if some library code started already + // then we cannot safely change parameters, and modifying the list is not + // thread safe. + return; + } + nni_init_param *item; + NNI_LIST_FOREACH (&nni_init_params, item) { + if (item->param == p) { + item->value = value; + return; + } + } + if ((item = NNI_ALLOC_STRUCT(item)) != NULL) { + item->param = p; + item->value = value; + nni_list_append(&nni_init_params, item); + } +} + +uint64_t +nni_init_get_param(nng_init_parameter p, uint64_t default_value) +{ + nni_init_param *item; + NNI_LIST_FOREACH (&nni_init_params, item) { + if (item->param == p) { + return (item->value); + } + } + return (default_value); +} + +static void +nni_init_params_fini(void) +{ + nni_init_param *item; + printf("FINI\n"); + while ((item = nni_list_first(&nni_init_params)) != NULL) { + printf("DOING a removal of %d", (int)item->param); + nni_list_remove(&nni_init_params, item); + NNI_FREE_STRUCT(item); + } +} + void nni_fini(void) { if (!nni_inited) { + // make sure we discard parameters even if we didn't startup + nni_init_params_fini(); return; } nni_sp_tran_sys_fini(); @@ -67,6 +126,7 @@ nni_fini(void) nni_taskq_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) nni_id_map_sys_fini(); + nni_init_params_fini(); nni_plat_fini(); nni_inited = false; diff --git a/src/core/init.h b/src/core/init.h index 4340b15b9..2be7c4994 100644 --- a/src/core/init.h +++ b/src/core/init.h @@ -1,7 +1,6 @@ // -// Copyright 2017 Garrett D'Amore +// Copyright 2024 Staysail Systems, Inc. // Copyright 2017 Capitar IT Group BV -// Copyright 2017 Staysail Systems, Inc. // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -23,4 +22,12 @@ int nni_init(void); // that all resources used by the library are released back to the system. void nni_fini(void); +// nni_init_param is used by applications (via nng_init_param) to configure +// some tunable settings at runtime. It must be called before any other NNG +// functions are called, in order to have any effect at all. +void nni_init_set_param(nng_init_parameter, uint64_t value); + +// subsystems can call this to obtain a parameter value. +uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value); + #endif // CORE_INIT_H diff --git a/src/core/init_test.c b/src/core/init_test.c new file mode 100644 index 000000000..0548a9700 --- /dev/null +++ b/src/core/init_test.c @@ -0,0 +1,27 @@ +// +// Copyright 2024 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include "init.h" + +void +test_init_param(void) +{ + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 456) == 456); + nng_init_set_parameter(NNG_INIT_PARAMETER_NONE, 123); + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123); + nng_fini(); + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 567); +} + + +NUTS_TESTS = { + { "init parameter", test_init_param }, + { NULL, NULL }, +}; \ No newline at end of file diff --git a/src/core/taskq.c b/src/core/taskq.c index d914093bf..6b4389074 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -1,5 +1,5 @@ // -// Copyright 2022 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -245,20 +245,31 @@ nni_task_fini(nni_task *task) int nni_taskq_sys_init(void) { - int nthrs; + int num_thr; + int max_thr; #ifndef NNG_NUM_TASKQ_THREADS - nthrs = nni_plat_ncpu() * 2; -#else - nthrs = NNG_NUM_TASKQ_THREADS; +#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2) #endif -#if NNG_MAX_TASKQ_THREADS > 0 - if (nthrs > NNG_MAX_TASKQ_THREADS) { - nthrs = NNG_MAX_TASKQ_THREADS; - } + +#ifndef NNG_MAX_TASKQ_THREADS +#define NNG_MAX_TASKQ_THREADS 16 #endif - return (nni_taskq_init(&nni_taskq_systq, nthrs)); + max_thr = (int) nni_init_get_param( + NNG_INIT_MAX_TASK_THREADS, NNG_MAX_TASKQ_THREADS); + + num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_TASK_THREADS, NNG_NUM_TASKQ_THREADS); + + if (num_thr > max_thr) { + num_thr = max_thr; + } + if (num_thr < 2) { + num_thr = 2; + } + + return (nni_taskq_init(&nni_taskq_systq, num_thr)); } void diff --git a/src/nng.c b/src/nng.c index ce75d8325..965aab86e 100644 --- a/src/nng.c +++ b/src/nng.c @@ -2011,3 +2011,9 @@ nng_version(void) return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr( NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX); } + +void +nng_init_set_parameter(nng_init_parameter p, uint64_t value) +{ + nni_init_set_param(p, value); +} \ No newline at end of file diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index c6abee5ff..1ac5a594d 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -25,14 +25,10 @@ // for it to ensure that names can be looked up concurrently. This isn't // as elegant or scalable as a true asynchronous resolver would be, but // it has the advantage of being fairly portable, and concurrent enough for -// the vast, vast majority of use cases. The total thread count can be +// the vast majority of use cases. The total thread count can be // changed with this define. Note that some platforms may not have a // thread-safe getaddrinfo(). In that case they should set this to 1. -#ifndef NNG_RESOLV_CONCURRENCY -#define NNG_RESOLV_CONCURRENCY 4 -#endif - #ifndef AI_NUMERICSERV #define AI_NUMERICSERV 0 #endif @@ -41,7 +37,8 @@ static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); static bool resolv_fini = false; static nni_list resolv_aios; -static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY]; +static nni_thr *resolv_thrs; +static int resolv_num_thr; typedef struct resolv_item resolv_item; struct resolv_item { @@ -450,14 +447,29 @@ nni_posix_resolv_sysinit(void) resolv_fini = false; nni_aio_list_init(&resolv_aios); - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { +#ifndef NNG_RESOLV_CONCURRENCY +#define NNG_RESOLV_CONCURRENCY 4 +#endif + + resolv_num_thr = nni_init_get_param( + NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY); + if (resolv_num_thr < 1) { + resolv_num_thr = 1; + } + // no limit on the maximum for now + resolv_thrs = nni_zalloc(sizeof (nni_thr *) * resolv_num_thr); + if (resolv_thrs == NULL) { + return (NNG_ENOMEM); + } + + for (int i = 0; i < resolv_num_thr; i++) { int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); if (rv != 0) { nni_posix_resolv_sysfini(); return (rv); } } - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { + for (int i = 0; i < resolv_num_thr; i++) { nni_thr_run(&resolv_thrs[i]); } @@ -472,8 +484,11 @@ nni_posix_resolv_sysfini(void) nni_cv_wake(&resolv_cv); nni_mtx_unlock(&resolv_mtx); - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { - nni_thr_fini(&resolv_thrs[i]); + if (resolv_thrs != NULL) { + for (int i = 0; i < resolv_num_thr; i++) { + nni_thr_fini(&resolv_thrs[i]); + } + nni_free(resolv_thrs, sizeof (nni_thr *) * resolv_num_thr); } } diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c index b08f2e4da..5739e35e7 100644 --- a/src/platform/windows/win_io.c +++ b/src/platform/windows/win_io.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -89,26 +89,33 @@ nni_win_io_sysinit(void) HANDLE h; int i; int rv; - int nthr = nni_plat_ncpu() * 2; + int num_thr; + int max_thr; - // Limits on the thread count. This is fairly arbitrary. - if (nthr < 2) { - nthr = 2; - } #ifndef NNG_MAX_POLLER_THREADS #define NNG_MAX_POLLER_THREADS 8 #endif -#if NNG_MAX_POLLER_THREADS > 0 - if (nthr > NNG_MAX_POLLER_THREADS) { - nthr = NNG_MAX_POLLER_THREADS; - } +#ifndef NNG_NUM_POLLER_THREADS +#define NNG_NUM_POLLER_THREADS (nni_plat_ncpu()) #endif - if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { + max_thr = (int) nni_init_get_param( + NNG_INIT_MAX_POLLER_THREADS, NNG_MAX_POLLER_THREADS); + + num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_POLLER_THREADS, NNG_NUM_POLLER_THREADS); + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, num_thr)) == NULL) { return (NNG_ENOMEM); } - win_io_nthr = nthr; + win_io_nthr = num_thr; - h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nthr); + h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, num_thr); if (h == NULL) { return (nni_win_error(GetLastError())); } @@ -145,7 +152,7 @@ nni_win_io_sysfini(void) nni_thr_fini(&win_io_thrs[i]); } - NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr); + NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr); } #endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 528da451c..989af056f 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -22,23 +22,20 @@ // host file, WINS, or other naming services. As a result, we just build // our own limited asynchronous resolver with threads. -#ifndef NNG_RESOLV_CONCURRENCY -#define NNG_RESOLV_CONCURRENCY 4 -#endif - -static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; -static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); +static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; +static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); static bool resolv_fini = false; static nni_list resolv_aios; -static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY]; +static nni_thr *resolv_thrs; +static int resolv_num_thr; typedef struct resolv_item resolv_item; struct resolv_item { int family; bool passive; - char * host; - char * serv; - nni_aio * aio; + char *host; + char *serv; + nni_aio *aio; nng_sockaddr *sa; }; @@ -159,9 +156,9 @@ resolv_task(resolv_item *item) nni_mtx_lock(&resolv_mtx); if ((probe != NULL) && (item->aio != NULL)) { - struct sockaddr_in * sin; + struct sockaddr_in *sin; struct sockaddr_in6 *sin6; - nni_sockaddr * sa; + nni_sockaddr *sa; sa = item->sa; @@ -270,7 +267,7 @@ resolv_worker(void *notused) nni_mtx_lock(&resolv_mtx); for (;;) { - nni_aio * aio; + nni_aio *aio; resolv_item *item; int rv; @@ -311,9 +308,9 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) int rv; bool v6 = false; bool wrapped = false; - char * port; - char * host; - char * buf; + char *port; + char *host; + char *buf; size_t buf_len; if (addr == NULL) { @@ -411,7 +408,22 @@ nni_win_resolv_sysinit(void) nni_aio_list_init(&resolv_aios); resolv_fini = false; - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { +#ifndef NNG_RESOLV_CONCURRENCY +#define NNG_RESOLV_CONCURRENCY 4 +#endif + + resolv_num_thr = nni_init_get_param( + NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY); + if (resolv_num_thr < 1) { + resolv_num_thr = 1; + } + // no limit on the maximum for now + resolv_thrs = nni_zalloc(sizeof(nni_thr *) * resolv_num_thr); + if (resolv_thrs == NULL) { + return (NNG_ENOMEM); + } + + for (int i = 0; i < resolv_num_thr; i++) { int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); if (rv != 0) { nni_win_resolv_sysfini(); @@ -419,7 +431,7 @@ nni_win_resolv_sysinit(void) } nni_thr_set_name(&resolv_thrs[i], "nng:resolver"); } - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { + for (int i = 0; i < resolv_num_thr; i++) { nni_thr_run(&resolv_thrs[i]); } return (0); @@ -432,9 +444,10 @@ nni_win_resolv_sysfini(void) resolv_fini = true; nni_cv_wake(&resolv_cv); nni_mtx_unlock(&resolv_mtx); - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { + for (int i = 0; i < resolv_num_thr; i++) { nni_thr_fini(&resolv_thrs[i]); } + nni_free(resolv_thrs, sizeof(nni_thr *) * resolv_num_thr); } #endif // NNG_PLATFORM_WINDOWS