Skip to content

Commit

Permalink
fixes #1572 nng creates too many threads
Browse files Browse the repository at this point in the history
This further limits some of the thread counts, but principally it
offers a new runtime facility, nng_init_set_parameter(), which can
be used to set certain runtime parameters on the number of threads,
provided it is called before the rest of application start up.

This facility is quite intentionally "undocumented", at least for now,
as we want to limit our commitment to it.  Still this should be helpful
for applications that need to reduce the number of threads that are
created.
  • Loading branch information
gdamore committed Jan 2, 2024
1 parent 07ad78c commit a9e98e5
Show file tree
Hide file tree
Showing 12 changed files with 574 additions and 134 deletions.
22 changes: 18 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2023 Staysail Systems, Inc. <[email protected]>
# Copyright 2024 Staysail Systems, Inc. <[email protected]>
# Copyright (c) 2012 Martin Sustrik All rights reserved.
# Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
# Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved.
Expand Down Expand Up @@ -114,24 +114,32 @@ endif ()

nng_defines_if(NNG_ENABLE_STATS NNG_ENABLE_STATS)

set(NNG_RESOLV_CONCURRENCY 4 CACHE STRING "Resolver (DNS) concurrency.")
mark_as_advanced(NNG_RESOLV_CONCURRENCY)
if (NNG_RESOLV_CONCURRENCY)
add_definitions(-DNNG_RESOLV_CONCURRENCY=${NNG_RESOLV_CONCURRENCY})
endif ()
mark_as_advanced(NNG_RESOLV_CONCURRENCY)

set(NNG_NUM_TASKQ_THREADS 0 CACHE STRING "Fixed number of task threads, 0 for automatic")
mark_as_advanced(NNG_NUM_TASKQ_THREADS)
if (NNG_NUM_TASKQ_THREADS)
add_definitions(-DNNG_NUM_TASKQ_THREADS=${NNG_NUM_TASKQ_THREADS})
endif ()
mark_as_advanced(NNG_NUM_TASKQ_THREADS)

set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on taskq threads, 0 for no limit")
set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on task threads, 0 for no limit")
mark_as_advanced(NNG_MAX_TASKQ_THREADS)
if (NNG_MAX_TASKQ_THREADS)
add_definitions(-DNNG_MAX_TASKQ_THREADS=${NNG_MAX_TASKQ_THREADS})
endif ()

# Expire threads. This runs the timeout handling, and having more of them
# reduces contention on the common locks used for aio expiration.
set(NNG_NUM_EXPIRE_THREADS 0 CACHE STRING "Fixed number of expire threads, 0 for automatic")
mark_as_advanced(NNG_NUM_EXPIRE_THREADS)
if (NNG_NUM_EXPIRE_THREADS)
add_definitions(-DNNG_NUM_EXPIRE_THREADS=${NNG_NUM_EXPIRE_THREADS})
endif ()

set(NNG_MAX_EXPIRE_THREADS 8 CACHE STRING "Upper bound on expire threads, 0 for no limit")
mark_as_advanced(NNG_MAX_EXPIRE_THREADS)
if (NNG_MAX_EXPIRE_THREADS)
Expand All @@ -140,6 +148,12 @@ endif()

# Poller threads. These threads run the pollers. This is mostly used
# on Windows right now, as the POSIX platforms use a single threaded poller.
set(NNG_NUM_POLLER_THREADS 0 CACHE STRING "Fixed number of I/O poller threads, 0 for automatic")
if (NNG_NUM_POLLER_THREADS)
add_definitions(-DNNG_NUM_POLLER_THREADS=${NNG_NUM_POLLER_THREADS})
endif ()
mark_as_advanced(NNG_NUM_POLLER_THREADS)

set(NNG_MAX_POLLER_THREADS 8 CACHE STRING "Upper bound on I/O poller threads, 0 for no limit")
mark_as_advanced(NNG_MAX_POLLER_THREADS)
if (NNG_MAX_POLLER_THREADS)
Expand Down
234 changes: 172 additions & 62 deletions include/nng/nng.h

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2021 Staysail Systems, Inc. <[email protected]>
# Copyright 2024 Staysail Systems, Inc. <[email protected]>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
Expand Down Expand Up @@ -78,6 +78,7 @@ nng_test(aio_test)
nng_test(buf_size_test)
nng_test(errors_test)
nng_test(id_test)
nng_test(init_test)
nng_test(list_test)
nng_test(message_test)
nng_test(reconnect_test)
Expand Down
29 changes: 20 additions & 9 deletions src/core/aio.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -843,18 +843,29 @@ int
nni_aio_sys_init(void)
{
int num_thr;
int max_thr;

#ifndef NNG_NUM_EXPIRE_THREADS
num_thr = nni_plat_ncpu();
#else
num_thr = NNG_NUM_EXPIRE_THREADS;
#ifndef NNG_MAX_EXPIRE_THREADS
#define NNG_MAX_EXPIRE_THREADS 8
#endif
#if NNG_MAX_EXPIRE_THREADS > 0
if (num_thr > NNG_MAX_EXPIRE_THREADS) {
num_thr = NNG_MAX_EXPIRE_THREADS;
}

#ifndef NNG_NUM_EXPIRE_THREADS
#define NNG_NUM_EXPIRE_THREADS (nni_plat_ncpu())
#endif

max_thr = (int) nni_init_get_param(
NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS);

num_thr = (int) nni_init_get_param(
NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS);

if ((max_thr > 0) && (num_thr > max_thr)) {
num_thr = max_thr;
}
if (num_thr < 1) {
num_thr = 1;
}
nni_init_set_effective(NNG_INIT_NUM_EXPIRE_THREADS, num_thr);
nni_aio_expire_q_list =
nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr);
nni_aio_expire_q_cnt = num_thr;
Expand Down
100 changes: 99 additions & 1 deletion src/core/init.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -54,10 +54,107 @@ nni_init(void)
return (nni_plat_init(nni_init_helper));
}

// accessing the list of parameters
typedef struct nni_init_param {
nni_list_node node;
nng_init_parameter param;
uint64_t value;
#ifdef NNG_TEST_LIB
uint64_t effective;
#endif
} nni_init_param;

static nni_list nni_init_params =
NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node);

void
nni_init_set_param(nng_init_parameter p, uint64_t value)
{
if (nni_inited) {
// this is paranoia -- if some library code started already
// then we cannot safely change parameters, and modifying the
// list is not thread safe.
return;
}
nni_init_param *item;
NNI_LIST_FOREACH (&nni_init_params, item) {
if (item->param == p) {
item->value = value;
return;
}
}
if ((item = NNI_ALLOC_STRUCT(item)) != NULL) {
item->param = p;
item->value = value;
nni_list_append(&nni_init_params, item);
}
}

uint64_t
nni_init_get_param(nng_init_parameter p, uint64_t default_value)
{
nni_init_param *item;
NNI_LIST_FOREACH (&nni_init_params, item) {
if (item->param == p) {
return (item->value);
}
}
return (default_value);
}

void
nni_init_set_effective(nng_init_parameter p, uint64_t value)
{
#ifdef NNG_TEST_LIB
nni_init_param *item;
NNI_LIST_FOREACH (&nni_init_params, item) {
if (item->param == p) {
item->effective = value;
return;
}
}
if ((item = NNI_ALLOC_STRUCT(item)) != NULL) {
item->param = p;
item->effective = value;
nni_list_append(&nni_init_params, item);
}
#else
NNI_ARG_UNUSED(p);
NNI_ARG_UNUSED(value);
#endif
}

#ifdef NNG_TEST_LIB
uint64_t
nni_init_get_effective(nng_init_parameter p)
{
nni_init_param *item;
NNI_LIST_FOREACH (&nni_init_params, item) {
if (item->param == p) {
return (item->effective);
}
}
return ((uint64_t)-1);
}
#endif


static void
nni_init_params_fini(void)
{
nni_init_param *item;
while ((item = nni_list_first(&nni_init_params)) != NULL) {
nni_list_remove(&nni_init_params, item);
NNI_FREE_STRUCT(item);
}
}

void
nni_fini(void)
{
if (!nni_inited) {
// make sure we discard parameters even if we didn't startup
nni_init_params_fini();
return;
}
nni_sp_tran_sys_fini();
Expand All @@ -67,6 +164,7 @@ nni_fini(void)
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_id_map_sys_fini();
nni_init_params_fini();

nni_plat_fini();
nni_inited = false;
Expand Down
14 changes: 12 additions & 2 deletions src/core/init.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2017 Capitar IT Group BV <[email protected]>
// Copyright 2017 Staysail Systems, Inc. <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
Expand All @@ -23,4 +22,15 @@ int nni_init(void);
// that all resources used by the library are released back to the system.
void nni_fini(void);

// nni_init_param is used by applications (via nng_init_param) to configure
// some tunable settings at runtime. It must be called before any other NNG
// functions are called, in order to have any effect at all.
void nni_init_set_param(nng_init_parameter, uint64_t value);

// subsystems can call this to obtain a parameter value.
uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value);

// subsystems can set this to facilitate tests (only used in test code)
void nni_init_set_effective(nng_init_parameter p, uint64_t value);

#endif // CORE_INIT_H
Loading

0 comments on commit a9e98e5

Please sign in to comment.