Skip to content

Commit

Permalink
First PoC for solving threading issues in node loader.
Browse files Browse the repository at this point in the history
  • Loading branch information
viferga committed May 17, 2024
1 parent 4768980 commit 6e09cc8
Show file tree
Hide file tree
Showing 5 changed files with 436 additions and 33 deletions.
157 changes: 124 additions & 33 deletions source/loaders/node_loader/source/node_loader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct loader_impl_node_type
loader_impl_async_initialize_safe initialize_safe;
napi_threadsafe_function threadsafe_initialize;

/* TODO: Remove all napi_value and arguments from here -> */
napi_value execution_path_safe_ptr;
loader_impl_async_execution_path_safe execution_path_safe;
napi_threadsafe_function threadsafe_execution_path;
Expand All @@ -243,8 +244,8 @@ struct loader_impl_node_type
loader_impl_async_discover_safe discover_safe;
napi_threadsafe_function threadsafe_discover;

napi_value func_call_safe_ptr;
loader_impl_async_func_call_safe func_call_safe;
// napi_value func_call_safe_ptr;
// loader_impl_async_func_call_safe func_call_safe;
napi_threadsafe_function threadsafe_func_call;

napi_value func_await_safe_ptr;
Expand All @@ -266,10 +267,12 @@ struct loader_impl_node_type
napi_value destroy_safe_ptr;
loader_impl_async_destroy_safe destroy_safe;
napi_threadsafe_function threadsafe_destroy;
/* TODO: -> To here*/

uv_thread_t thread;
uv_loop_t *thread_loop;

/* TODO: Delete mutex and condition */
uv_mutex_t mutex;
uv_cond_t cond;
std::atomic_bool locked;
Expand Down Expand Up @@ -372,6 +375,9 @@ struct loader_impl_async_func_call_safe_type
size_t size;
napi_value recv;
function_return ret;

uv_mutex_t mutex;
uv_cond_t cond;
};

struct loader_impl_async_func_await_safe_type
Expand Down Expand Up @@ -1360,38 +1366,88 @@ int function_node_interface_create(function func, function_impl impl)
return (node_func->argv == NULL);
}

/* TODO: Convert this into a templated lambda */
void node_loader_impl_function_call_js_func_call_safe(napi_env env, napi_value js_callback, void *context, void *data)
{
loader_impl_async_func_call_safe func_call_safe = static_cast<loader_impl_async_func_call_safe>(data);

(void)js_callback;
(void)context;

if (env != NULL && js_callback != NULL)
{
/* Lock the call safe mutex and get the parameters */
uv_mutex_lock(&func_call_safe->mutex);

/* Store environment for reentrant calls */
func_call_safe->node_impl->env = env;

/* Call to the implementation function */
node_loader_impl_func_call_safe(env, func_call_safe);

/* Clear environment */
// func_call_cast.safe->node_impl->env = NULL;

/* Signal function call condition */
uv_cond_signal(&func_call_safe->cond);

uv_mutex_unlock(&func_call_safe->mutex);
}
}

function_return function_node_interface_invoke(function func, function_impl impl, function_args args, size_t size)
{
loader_impl_node_function node_func = (loader_impl_node_function)impl;

if (node_func != NULL)
{
loader_impl_node node_impl = node_func->node_impl;
function_return ret = NULL;
napi_status status;

/* Set up call safe arguments */
node_impl->func_call_safe->node_impl = node_impl;
node_impl->func_call_safe->func = func;
node_impl->func_call_safe->node_func = node_func;
node_impl->func_call_safe->args = static_cast<void **>(args);
node_impl->func_call_safe->size = size;
node_impl->func_call_safe->recv = NULL;
node_impl->func_call_safe->ret = NULL;

/* Check if we are in the JavaScript thread */
if (node_impl->js_thread_id == std::this_thread::get_id())
{
loader_impl_async_func_call_safe_type func_call_safe;

/* Set up call safe arguments */
func_call_safe.node_impl = node_impl;
func_call_safe.func = func;
func_call_safe.node_func = node_func;
func_call_safe.args = static_cast<void **>(args);
func_call_safe.size = size;
func_call_safe.recv = NULL;
func_call_safe.ret = NULL;

/* We are already in the V8 thread, we can call safely */
node_loader_impl_func_call_safe(node_impl->env, node_impl->func_call_safe);
node_loader_impl_func_call_safe(node_impl->env, &func_call_safe);

/* Set up return of the function call */
ret = node_impl->func_call_safe->ret;
return func_call_safe.ret;
}

/* TODO: Refactor this properly */

/* Lock the mutex and set the parameters */
else if (node_impl->locked.load() == false && uv_mutex_trylock(&node_impl->mutex) == 0)
// if (node_impl->locked.load() == false && uv_mutex_trylock(&node_impl->mutex) == 0)
{
node_impl->locked.store(true);
loader_impl_async_func_call_safe_type func_call_safe;
function_return ret = NULL;

// node_impl->locked.store(true);

/* Set up call safe arguments */
func_call_safe.node_impl = node_impl;
func_call_safe.func = func;
func_call_safe.node_func = node_func;
func_call_safe.args = static_cast<void **>(args);
func_call_safe.size = size;
func_call_safe.recv = NULL;
func_call_safe.ret = NULL;

uv_mutex_init(&func_call_safe.mutex);
uv_cond_init(&func_call_safe.cond);

uv_mutex_lock(&func_call_safe.mutex);

/* Acquire the thread safe function in order to do the call */
status = napi_acquire_threadsafe_function(node_impl->threadsafe_func_call);
Expand All @@ -1402,13 +1458,24 @@ function_return function_node_interface_invoke(function func, function_impl impl
}

/* Execute the thread safe call in a nonblocking manner */
status = napi_call_threadsafe_function(node_impl->threadsafe_func_call, nullptr, napi_tsfn_nonblocking);
status = napi_call_threadsafe_function(node_impl->threadsafe_func_call, &func_call_safe, napi_tsfn_nonblocking);

if (status != napi_ok)
{
log_write("metacall", LOG_LEVEL_ERROR, "Invalid to call to thread safe function invoke function in NodeJS loader");
}

/* Wait for the execution of the safe call */
uv_cond_wait(&func_call_safe.cond, &func_call_safe.mutex);

/* Set up return of the function call */
ret = func_call_safe.ret;

// node_impl->locked.store(false);

/* Unlock the mutex */
uv_mutex_unlock(&func_call_safe.mutex);

/* Release call safe function */
status = napi_release_threadsafe_function(node_impl->threadsafe_func_call, napi_tsfn_release);

Expand All @@ -1417,23 +1484,11 @@ function_return function_node_interface_invoke(function func, function_impl impl
log_write("metacall", LOG_LEVEL_ERROR, "Invalid to release thread safe function invoke function in NodeJS loader");
}

/* Wait for the execution of the safe call */
uv_cond_wait(&node_impl->cond, &node_impl->mutex);

/* Set up return of the function call */
ret = node_impl->func_call_safe->ret;

node_impl->locked.store(false);
uv_mutex_destroy(&func_call_safe.mutex);
uv_cond_destroy(&func_call_safe.cond);

/* Unlock the mutex */
uv_mutex_unlock(&node_impl->mutex);
}
else
{
log_write("metacall", LOG_LEVEL_ERROR, "Potential deadlock detected in function_node_interface_invoke, the call has not been executed in order to avoid the deadlock");
return ret;
}

return ret;
}

return NULL;
Expand Down Expand Up @@ -3876,15 +3931,51 @@ void *node_loader_impl_register(void *node_impl_ptr, void *env_ptr, void *functi

/* Safe function call */
{
/* TODO: Refactor this */

static const char threadsafe_func_name_str[] = "node_loader_impl_async_func_call_safe";

/*
node_loader_impl_thread_safe_function_initialize<loader_impl_async_func_call_safe_type>(
env,
threadsafe_func_name_str, sizeof(threadsafe_func_name_str),
&node_loader_impl_async_func_call_safe,
(loader_impl_async_func_call_safe_type **)(&node_impl->func_call_safe),
&node_impl->func_call_safe_ptr,
&node_impl->threadsafe_func_call);
*/

/*
void node_loader_impl_thread_safe_function_initialize(napi_env env,
const char name[], size_t size, napi_value (*callback)(napi_env, napi_callback_info), T **data,
napi_value *ptr, napi_threadsafe_function *threadsafe_function)
*/

napi_value func_call_safe_ptr;

/* Initialize call safe function with context */
status = napi_create_function(env, nullptr, 0, &node_loader_impl_async_func_call_safe, nullptr, &func_call_safe_ptr);

node_loader_impl_exception(env, status);

/* Create call safe function */
napi_value threadsafe_func_name;

status = napi_create_string_utf8(env, threadsafe_func_name_str, sizeof(threadsafe_func_name_str), &threadsafe_func_name);

node_loader_impl_exception(env, status);

// TODO: Does this number must be equivalent to the number of the threads of NodeJS?
unsigned int processor_count = std::thread::hardware_concurrency();

status = napi_create_threadsafe_function(env, func_call_safe_ptr,
nullptr, threadsafe_func_name,
0, processor_count,
nullptr, nullptr,
nullptr, &node_loader_impl_function_call_js_func_call_safe,
&node_impl->threadsafe_func_call);

node_loader_impl_exception(env, status);
}

/* Safe function await */
Expand Down Expand Up @@ -5389,7 +5480,7 @@ int node_loader_impl_destroy(loader_impl impl)
delete node_impl->load_from_memory_safe;
delete node_impl->clear_safe;
delete node_impl->discover_safe;
delete node_impl->func_call_safe;
// delete node_impl->func_call_safe;
delete node_impl->func_await_safe;
delete node_impl->func_destroy_safe;
delete node_impl->future_await_safe;
Expand Down
1 change: 1 addition & 0 deletions source/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ add_subdirectory(metacall_node_python_deadlock_test)
# add_subdirectory(metacall_node_signal_handler_test) # Note: Not used anymore but leaving it here for reference to solve this: https://github.com/metacall/core/issues/121
add_subdirectory(metacall_node_native_code_test)
add_subdirectory(metacall_node_extension_test)
add_subdirectory(metacall_node_multithread_deadlock_test)
add_subdirectory(metacall_distributable_test)
add_subdirectory(metacall_cast_test)
add_subdirectory(metacall_init_fini_test)
Expand Down
Loading

0 comments on commit 6e09cc8

Please sign in to comment.