diff --git a/docs.md b/docs.md index ef74a499..193b3240 100644 --- a/docs.md +++ b/docs.md @@ -1016,15 +1016,19 @@ end) async:send() ``` -### `uv.new_async(callback)` +### `uv.new_async(callback, [size])` **Parameters:** - `callback`: `callable` - `...`: `threadargs` passed to/from `uv.async_send(async, ...)` +- `size`: `integer` or `nil` (default: `0`) Creates and initializes a new `uv_async_t`. Returns the Lua userdata wrapping it. +If size is omitted (or 0), each call to send discards any pending one; otherwise, a call to send could fail with `ENOSPC` when there are already `size` pending send. + + **Returns:** `uv_async_t userdata` or `fail` **Note**: Unlike other handle initialization functions, this immediately starts @@ -1050,6 +1054,7 @@ every call to it will yield an execution of the callback. For example: if `uv.async_send()` is called 5 times in a row before the callback is called, the callback will only be called once. If `uv.async_send()` is called again after the callback was called, it will be called again. +When specifying a `size` greater than 0, this function will fail when there are pending calls. ## `uv_poll_t` — Poll handle diff --git a/src/async.c b/src/async.c index db25ce45..061e9f2c 100644 --- a/src/async.c +++ b/src/async.c @@ -16,18 +16,87 @@ */ #include "private.h" +typedef struct luv_async_send_s { + luv_thread_arg_t targ; + struct luv_async_send_s* next; +} luv_async_send_t; + +typedef struct { + luv_thread_arg_t targ; + uv_mutex_t mutex; + int max; // FIFO queue in case of max > 0 + int count; + luv_async_send_t* first; + luv_async_send_t* last; +} luv_async_arg_t; + +#define luv_get_async_arg_from_handle(H) ((luv_async_arg_t *) ((luv_handle_t*) (H)->data)->extra) + static uv_async_t* luv_check_async(lua_State* L, int index) { uv_async_t* handle = (uv_async_t*)luv_checkudata(L, index, "uv_async"); luaL_argcheck(L, handle->type == UV_ASYNC && handle->data, index, "Expected uv_async_t"); return handle; } +#define luv_is_async_queue(AA) ((AA)->max > 0) + +static luv_async_send_t* luv_async_pop(luv_async_arg_t* asarg) { + luv_async_send_t* sendarg = asarg->first; + if (sendarg != NULL) { + asarg->count--; + asarg->first = sendarg->next; + if (asarg->first == NULL) { + asarg->last = NULL; + } + } + return sendarg; +} + +static luv_async_send_t* luv_async_push(luv_async_arg_t* asarg) { + luv_async_send_t* sendarg = (luv_async_send_t*)malloc(sizeof(luv_async_send_t)); + if (sendarg != NULL) { + memset(sendarg, 0, sizeof(luv_async_send_t)); + asarg->count++; + if (asarg->last != NULL) { + asarg->last->next = sendarg; + } + asarg->last = sendarg; + if (asarg->first == NULL) { + asarg->first = sendarg; + } + } + return sendarg; +} + static void luv_async_cb(uv_async_t* handle) { luv_handle_t* data = (luv_handle_t*)handle->data; lua_State* L = data->ctx->L; - int n = luv_thread_arg_push(L, (luv_thread_arg_t*)data->extra, LUVF_THREAD_SIDE_MAIN); - luv_call_callback(L, data, LUV_ASYNC, n); - luv_thread_arg_clear(L, (luv_thread_arg_t*)data->extra, LUVF_THREAD_SIDE_MAIN); + luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle); + uv_mutex_t *argmutex = &asarg->mutex; + luv_thread_arg_t targcpy; // work on a copy of the arguments + int n; + int q = luv_is_async_queue(asarg); + do { + uv_mutex_lock(argmutex); + if (q) { + luv_async_send_t* sendarg = luv_async_pop(asarg); + if (sendarg == NULL) { + uv_mutex_unlock(argmutex); + return; + } + targcpy = sendarg->targ; + free(sendarg); + } else { + targcpy = asarg->targ; + asarg->targ.argc = 0; // empty the shared original, nothing to clear + } + uv_mutex_unlock(argmutex); + n = luv_thread_arg_push(L, &targcpy, LUVF_THREAD_SIDE_MAIN); + if (n >= 0) { + luv_call_callback(L, data, LUV_ASYNC, n); + } + luv_thread_arg_clear(L, &targcpy, LUVF_THREAD_SIDE_MAIN); // clear the copy + } while (q); } static int luv_new_async(lua_State* L) { @@ -35,6 +104,7 @@ static int luv_new_async(lua_State* L) { luv_handle_t* data; int ret; luv_ctx_t* ctx = luv_context(L); + int max = luaL_optinteger(L, 2, 0); luaL_checktype(L, 1, LUA_TFUNCTION); handle = (uv_async_t*)luv_newuserdata(L, uv_handle_size(UV_ASYNC)); ret = uv_async_init(ctx->loop, handle, luv_async_cb); @@ -43,21 +113,83 @@ static int luv_new_async(lua_State* L) { return luv_error(L, ret); } data = luv_setup_handle(L, ctx); - data->extra = (luv_thread_arg_t*)malloc(sizeof(luv_thread_arg_t)); + luv_async_arg_t* asarg = (luv_async_arg_t*)malloc(sizeof(luv_async_arg_t)); + memset(asarg, 0, sizeof(luv_async_arg_t)); + asarg->max = max; + ret = uv_mutex_init(&asarg->mutex); + if (ret < 0) { // unlikely + abort(); + } + data->extra = asarg; data->extra_gc = free; - memset(data->extra, 0, sizeof(luv_thread_arg_t)); handle->data = data; luv_check_callback(L, (luv_handle_t*)handle->data, LUV_ASYNC, 1); return 1; } +// From handle.c +static int luv_handle_gc(lua_State* L); +// From thread.c +static void luv_thread_arg_free(luv_thread_arg_t* args); + +static int luv_async_gc(lua_State* L) { + uv_async_t* handle = *(uv_async_t**)lua_touserdata(L, 1); + luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle); + uv_mutex_t *argmutex = &asarg->mutex; + uv_mutex_lock(argmutex); + if (luv_is_async_queue(asarg)) { + luv_async_send_t* sendarg; + while ((sendarg = luv_async_pop(asarg)) != NULL) { + luv_thread_arg_free(&sendarg->targ); + free(sendarg); + } + } else { + luv_thread_arg_free(&asarg->targ); // in case of a pending send + } + uv_mutex_unlock(argmutex); + uv_mutex_destroy(argmutex); + return luv_handle_gc(L); +} + static int luv_async_send(lua_State* L) { int ret; uv_async_t* handle = luv_check_async(L, 1); - luv_thread_arg_t* arg = (luv_thread_arg_t *)((luv_handle_t*) handle->data)->extra; - - luv_thread_arg_set(L, arg, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD); + luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle); + uv_mutex_t *argmutex = &asarg->mutex; + luv_thread_arg_t targcpy; + ret = luv_thread_arg_set(L, &targcpy, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD); + if (ret < 0) { + luv_thread_arg_free(&targcpy); + return luv_thread_arg_error(L); + } + uv_mutex_lock(argmutex); + if (luv_is_async_queue(asarg)) { + luv_async_send_t* sendarg = NULL; + ret = UV_ENOSPC; + if (asarg->count < asarg->max) { + sendarg = luv_async_push(asarg); + if (sendarg == NULL) { + ret = UV_ENOMEM; + } + } + if (sendarg == NULL) { + uv_mutex_unlock(argmutex); + luv_thread_arg_free(&targcpy); + return luv_error(L, ret); + } + sendarg->targ = targcpy; + } else { + luv_thread_arg_free(&asarg->targ); // in case of a pending send + asarg->targ = targcpy; + } + uv_mutex_unlock(argmutex); ret = uv_async_send(handle); - luv_thread_arg_clear(L, arg, LUVF_THREAD_SIDE_CHILD); return luv_result(L, ret); } + +static void luv_async_init(lua_State* L) { + luaL_getmetatable(L, "uv_async"); + lua_pushcfunction(L, luv_async_gc); + lua_setfield(L, -2, "__gc"); + lua_pop(L, 1); +} diff --git a/src/luv.c b/src/luv.c index 18c01936..c196099c 100644 --- a/src/luv.c +++ b/src/luv.c @@ -908,6 +908,7 @@ LUALIB_API int luaopen_luv (lua_State* L) { luv_req_init(L); luv_handle_init(L); + luv_async_init(L); #if LUV_UV_VERSION_GEQ(1, 28, 0) luv_dir_init(L); #endif diff --git a/src/thread.c b/src/thread.c index 3c21364a..d8021d92 100644 --- a/src/thread.c +++ b/src/thread.c @@ -62,17 +62,25 @@ static const char* luv_getmtname(lua_State *L, int idx) { return name; } +static int luv_thread_arg_set_error(lua_State* L, luv_thread_arg_t* args, int type, int index) { + args->argc = index; + lua_pushinteger(L, type); + lua_pushinteger(L, index + 1); + return -1; +} + static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int top, int flags) { int i; int side = LUVF_THREAD_SIDE(flags); int async = LUVF_THREAD_ASYNC(flags); - + // thread works by reference, async works by copy. idx = idx > 0 ? idx : 1; i = idx; args->flags = flags; while (i <= top && i < LUV_THREAD_MAXNUM_ARG + idx) { - luv_val_t *arg = args->argv + (i - idx); + int ii = i - idx; + luv_val_t *arg = args->argv + ii; arg->type = lua_type(L, i); arg->ref[0] = arg->ref[1] = LUA_NOREF; switch (arg->type) @@ -91,11 +99,15 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int } break; case LUA_TSTRING: - if (async) - { - const char* p = lua_tolstring(L, i, &arg->val.str.len); - arg->val.str.base = malloc(arg->val.str.len); - memcpy((void*)arg->val.str.base, p, arg->val.str.len); + if (async) { + size_t l = 0; + const char* p = lua_tolstring(L, i, &l); + void* b = malloc(l + 1); + if (b == NULL) { + return luv_thread_arg_set_error(L, args, LUA_TNONE, ii); + } + arg->val.str.base = memcpy((void*)b, p, l + 1); + arg->val.str.len = l; } else { arg->val.str.base = lua_tolstring(L, i, &arg->val.str.len); lua_pushvalue(L, i); @@ -103,20 +115,37 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int } break; case LUA_TUSERDATA: - arg->val.udata.data = lua_topointer(L, i); - arg->val.udata.size = lua_rawlen(L, i); - arg->val.udata.metaname = luv_getmtname(L, i); - - if (arg->val.udata.size) { - lua_pushvalue(L, i); - arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX); + { + const void* p = lua_topointer(L, i); + size_t l = lua_rawlen(L, i); + const char* mtname = luv_getmtname(L, i); + if (async) { + if (l > 0) { + void* b = malloc(l); + if (b == NULL) { + return luv_thread_arg_set_error(L, args, LUA_TNONE, ii); + } + p = (const void*)memcpy(b, p, l); + } + if (mtname != NULL) { + size_t ml = strlen(mtname) + 1; + char* b = malloc(ml); + if (b == NULL) { + return luv_thread_arg_set_error(L, args, LUA_TNONE, ii); + } + mtname = (const void*)memcpy(b, mtname, ml); + } + } else { + lua_pushvalue(L, i); + arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX); + } + arg->val.udata.data = p; + arg->val.udata.size = l; + arg->val.udata.metaname = mtname; } break; default: - args->argc = i - idx; - lua_pushinteger(L, arg->type); - lua_pushinteger(L, i - idx + 1); - return -1; + return luv_thread_arg_set_error(L, args, arg->type, ii); } i++; } @@ -124,42 +153,54 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int return args->argc; } +static void luv_thread_arg_free(luv_thread_arg_t* args) { + int i; + for (i = 0; i < args->argc; i++) { + luv_val_t* arg = args->argv + i; + switch (arg->type) { + case LUA_TSTRING: + free((void*)arg->val.str.base); + arg->val.str.base = NULL; + break; + case LUA_TUSERDATA: + if (arg->val.udata.size > 0) { + free((void*)arg->val.udata.data); + arg->val.udata.data = NULL; + } + free((void*)arg->val.udata.metaname); + arg->val.udata.metaname = NULL; + break; + } + } +} + static void luv_thread_arg_clear(lua_State* L, luv_thread_arg_t* args, int flags) { int i; int side = LUVF_THREAD_SIDE(flags); - int set = LUVF_THREAD_SIDE(args->flags); + int setside = LUVF_THREAD_SIDE(args->flags); int async = LUVF_THREAD_ASYNC(args->flags); - if (args->argc == 0) + // clear is safe to be called multiple times from multiple sides. + if (args->argc <= 0) { return; - + } for (i = 0; i < args->argc; i++) { luv_val_t* arg = args->argv + i; switch (arg->type) { case LUA_TSTRING: - if (arg->ref[side] != LUA_NOREF) - { + if (arg->ref[side] != LUA_NOREF) { luaL_unref(L, LUA_REGISTRYINDEX, arg->ref[side]); arg->ref[side] = LUA_NOREF; - } else { - if(async && set!=side) - { - free((void*)arg->val.str.base); - arg->val.str.base = NULL; - arg->val.str.len = 0; - } } break; case LUA_TUSERDATA: - if (arg->ref[side]!=LUA_NOREF) - { - if (side != set) - { + if (arg->ref[side] != LUA_NOREF) { + if (side != setside) { // avoid custom gc lua_rawgeti(L, LUA_REGISTRYINDEX, arg->ref[side]); lua_pushnil(L); lua_setmetatable(L, -2); - lua_pop(L, -1); + lua_pop(L, 1); } luaL_unref(L, LUA_REGISTRYINDEX, arg->ref[side]); arg->ref[side] = LUA_NOREF; @@ -169,9 +210,11 @@ static void luv_thread_arg_clear(lua_State* L, luv_thread_arg_t* args, int flags break; } } + if (async && side != setside) { + luv_thread_arg_free(args); + } } -// called only in thread static int luv_thread_arg_push(lua_State* L, luv_thread_arg_t* args, int flags) { int i = 0; int side = LUVF_THREAD_SIDE(flags); @@ -196,18 +239,16 @@ static int luv_thread_arg_push(lua_State* L, luv_thread_arg_t* args, int flags) lua_pushlstring(L, arg->val.str.base, arg->val.str.len); break; case LUA_TUSERDATA: - if (arg->val.udata.size) - { + if (arg->val.udata.size > 0) { char *p = lua_newuserdata(L, arg->val.udata.size); memcpy(p, arg->val.udata.data, arg->val.udata.size); - if (arg->val.udata.metaname) - { + if (arg->val.udata.metaname != NULL) { luaL_getmetatable(L, arg->val.udata.metaname); lua_setmetatable(L, -2); } lua_pushvalue(L, -1); arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX); - }else{ + } else { lua_pushlightuserdata(L, (void*)arg->val.udata.data); } break; @@ -224,6 +265,9 @@ static int luv_thread_arg_error(lua_State *L) { int type = lua_tointeger(L, -2); int pos = lua_tointeger(L, -1); lua_pop(L, 2); + if (type == LUA_TNONE) { + return luaL_error(L, "Error: thread arg failure at %d", pos); + } return luaL_error(L, "Error: thread arg not support type '%s' at %d", lua_typename(L, type), pos); } @@ -306,9 +350,11 @@ static void luv_thread_cb(void* varg) { static void luv_thread_notify_close_cb(uv_handle_t *handle) { luv_thread_t *thread = handle->data; - if (thread->handle != 0) - uv_thread_join(&thread->handle); - + uv_thread_t uvt = thread->handle; + if (uvt != 0) { + thread->handle = 0; + uv_thread_join(&uvt); + } luaL_unref(thread->L, LUA_REGISTRYINDEX, thread->ref); thread->ref = LUA_NOREF; thread->L = NULL; @@ -494,9 +540,12 @@ static int luv_thread_setpriority(lua_State* L) { static int luv_thread_join(lua_State* L) { luv_thread_t* tid = luv_check_thread(L, 1); - int ret = uv_thread_join(&tid->handle); - if (ret < 0) return luv_error(L, ret); - tid->handle = 0; + uv_thread_t uvt = tid->handle; + if (uvt != 0) { + tid->handle = 0; + int ret = uv_thread_join(&uvt); + if (ret < 0) return luv_error(L, ret); + } lua_pushboolean(L, 1); return 1; } diff --git a/src/work.c b/src/work.c index b51bfd11..8b973021 100644 --- a/src/work.c +++ b/src/work.c @@ -113,7 +113,7 @@ static int luv_work_cb(lua_State* L) { return luv_thread_arg_error(L); } lua_pop(L, i); // pop all returned value - luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD); + luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_SIDE_CHILD); } luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD); } else { @@ -154,7 +154,7 @@ static void luv_work_cb_wrapper(uv_work_t* req) { // uv__threadpool_cleanup, so exit is not called in luv_cfpcall. int i = lctx->thrd_cpcall(L, luv_work_cb, (void*)req, LUVF_CALLBACK_NOEXIT); if (i != LUA_OK) { - luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD); + luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_SIDE_CHILD); luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD); } } @@ -177,7 +177,7 @@ static void luv_after_work_cb(uv_work_t* req, int status) { work->ref = LUA_NOREF; luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_MAIN); - luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_MAIN); + luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_SIDE_MAIN); free(work); } diff --git a/tests/test-async.lua b/tests/test-async.lua index e6ba667b..d057ea53 100644 --- a/tests/test-async.lua +++ b/tests/test-async.lua @@ -1,32 +1,126 @@ return require('lib/tap')(function (test) test("test pass async between threads", function(p, p, expect, uv) - local before = os.time() local async - async = uv.new_async(expect(function (a,b,c) + async = uv.new_async(expect(function (s, b, i, n, u) p('in async notify callback') - p(a,b,c) - assert(a=='a') - assert(b==true) - assert(c==250) + p(s, b, i, n, u) + assert(s=='a', 'bad string') + assert(b==true, 'bad boolean') + assert(i==250, 'bad integer') + assert(n==3.14, 'bad number') + assert(type(u)=='userdata', 'bad userdata') uv.close(async) end)) - local args = {500, 'string', nil, false, 5, "helloworld",async} - local unpack = unpack or table.unpack - uv.new_thread(function(num,s,null,bool,five,hw,asy) - local uv = require'luv' - assert(type(num) == "number") - assert(type(s) == "string") - assert(null == nil) - assert(bool == false) - assert(five == 5) - assert(hw == 'helloworld') + uv.new_thread(function(asy) assert(type(asy)=='userdata') - assert(uv.async_send(asy,'a',true,250)==0) - uv.sleep(1000) - end, unpack(args)):join() - local elapsed = (os.time() - before) * 1000 - assert(elapsed >= 1000, "elapsed should be at least delay ") + assert(asy:send('a', true, 250, 3.14, io.stderr)==0) + require('luv').sleep(10) + end, async):join() + end) + + test("test async multiple send", function(p, p, expect, uv) + local async + async = uv.new_async(expect(function (v) + p('in async notify callback') + assert(v=='ok') + async:close() + end)) + uv.new_thread(function(asy) + assert(type(asy)=='userdata') + assert(asy:send('not ok')==0) -- will be ignored but it's ok + assert(asy:send('ok')==0) + require('luv').sleep(10) + end, async):join() + end) + + test("test async queue send", function(p, p, expect, uv) + local async + async = uv.new_async(expect(function (v) + p('in async notify callback') + if v == 'close' then + async:close() + else + assert(v=='ok') + end + end, 3), 3) + uv.new_thread(function(asy) + local uv = require('luv') + assert(type(asy)=='userdata') + assert(asy:send('ok')==0) + assert(asy:send('ok')==0) + assert(asy:send('close')==0) + assert(select(3, asy:send('not ok'))=='ENOSPC') + uv.sleep(10) + end, async):join() + end) + + test("test async send from same thread", function(p, p, expect, uv) + local async + async = uv.new_async(expect(function (v) + p('in async notify callback') + assert(v=='ok') + async:close() + end)) + assert(async:send('not ok')==0) -- will be ignored but it's ok + assert(async:send('ok')==0) + assert(pcall(uv.async_send, 'not ok', function() end)==false) -- will fail + uv.run() + end) + + test("test async send during callback", function(p, p, expect, uv) + local async + async = uv.new_async(expect(function (d, v) + p('in async notify callback', d, v) + assert(v=='ok') + if d > 0 then + uv.sleep(d) + else + async:close() + end + end, 2)) + local t = uv.new_thread(function(asy) + local uv = require('luv') + assert(type(asy)=='userdata') + assert(asy:send(100, 'ok')==0) + uv.sleep(10) -- let async callback starts + assert(asy:send(0, 'ok')==0) + uv.sleep(10) + end, async) + uv.run() + t:join() + end) + + test("test pass back async between threads", function(p, p, expect, uv) + local async + async = uv.new_async(expect(function (asy) + async:close() + p('in async notify callback') + assert(type(asy)=='userdata') + assert(debug.getmetatable(asy)) + assert(asy:send('Hi\0', true, 250)==0) -- only working inside callback + local timer = uv.new_timer() + timer:start(10, 0, expect(function() + timer:close() + p('timeout') + assert(not debug.getmetatable(asy)) -- outside callback the userdata loose its metatable + end)) + end)) + local t = uv.new_thread(function(asy) + local uv = require('luv') + assert(type(asy)=='userdata', 'bad aync type') + local as + as = uv.new_async(function (s, b, i) + as:close() + assert(s=='Hi\0', 'bad string') + assert(b==true, 'bad boolean') + assert(i==250, 'bad integer') + end) + assert(asy:send(as)==0) + uv.run() + end, async) + uv.run() + t:join() end) end)