diff --git a/src/Malt.jl b/src/Malt.jl index 043392e..9c0bbb9 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -104,11 +104,11 @@ mutable struct Worker <: AbstractWorker # Spawn process cmd = _get_worker_cmd(; env, exeflags) proc = open(Cmd( - cmd; - detach=true, - windows_hide=true, - ), "w+") - + cmd; + detach=true, + windows_hide=true, + ), "w+") + # Keep internal list __iNtErNaL_get_running_procs() push!(__iNtErNaL_running_procs, proc) @@ -125,11 +125,11 @@ mutable struct Worker <: AbstractWorker # There's no reason to keep the worker process alive after the manager loses its handle. w = finalizer(w -> @async(stop(w)), new( - port, - proc, + port, + proc, getpid(proc), - socket, - MsgID(0), + socket, + MsgID(0), Dict{MsgID,Channel{WorkerResult}}(), ) ) @@ -158,15 +158,15 @@ function _exit_loop(worker::Worker) end sleep(1) catch e - @error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace()) + @error "Unexpection error inside the exit loop" worker exception = (e, catch_backtrace()) end end end function _receive_loop(worker::Worker) io = worker.current_socket - - + + # Here we use: # `for _i in Iterators.countfrom(1)` # instead of @@ -620,7 +620,7 @@ function interrupt(w::Worker) @warn "Tried to interrupt a worker that has already shut down." summary(w) else if Sys.iswindows() - ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) + ccall((:GenerateConsoleCtrlEvent, "Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) else Base.kill(w.proc, Base.SIGINT) end @@ -632,7 +632,31 @@ function interrupt(w::InProcessWorker) nothing end +""" + Malt.requestgc(w::Worker) +Request a garbage collection on the worker `w`. This is a non-blocking call (on the worker). +""" +function requestgc(w::Worker) + if !isrunning(w) + @warn "Tried to gc a worker that has already shut down." summary(w) + else + remote_eval_wait(Main, w, :(Base.notify(Main._gc_event))) + end +end + +""" + Malt.autogc(w::Worker) + +Initiate the worker auto gc every `ENV["MALT_AUTO_GC_SECONDS"]` or 900 seconds. +""" +function autogc(w::Worker) + if !isrunning(w) + @warn "Tried to gc a worker that has already shut down." summary(w) + else + remote_eval_wait(Main, w, :(Base.notify(Main._gc_auto_event))) + end +end # Based on `Base.task_done_hook` diff --git a/src/worker.jl b/src/worker.jl index 936564d..c7af685 100644 --- a/src/worker.jl +++ b/src/worker.jl @@ -73,7 +73,7 @@ function serve(server::Sockets.TCPServer) end # this next line can't fail msg_id = read(io, MsgID) - + msg_data, success = try (Base.invokelatest(deserialize, io), true) catch err @@ -81,7 +81,7 @@ function serve(server::Sockets.TCPServer) finally _discard_until_boundary(io) end - + if !success if msg_type === MsgType.from_host_call_with_response msg_type = MsgType.special_serialization_failure @@ -89,7 +89,7 @@ function serve(server::Sockets.TCPServer) continue end end - + try @debug("WORKER: Received message", msg_data) handle(Val(msg_type), io, msg_data, msg_id) @@ -142,7 +142,7 @@ function handle(::Val{MsgType.from_host_call_without_response}, socket, msg, msg @async try f(args...; kwargs...) catch e - @warn("WORKER: Got exception while running call without response", exception=(e, catch_backtrace())) + @warn("WORKER: Got exception while running call without response", exception = (e, catch_backtrace())) # TODO: exception is ignored, is that what we want here? end end @@ -156,11 +156,40 @@ function handle(::Val{MsgType.special_serialization_failure}, socket, msg, msg_i ) end -format_error(err, bt) = sprint() do io - Base.invokelatest(showerror, io, err, bt) +format_error(err, bt) = + sprint() do io + Base.invokelatest(showerror, io, err, bt) + end + +const _channel_cache = Dict{UInt64,AbstractChannel}() +const _gc_event = Base.Event() +const _gc_task = Threads.@spawn begin + for _i in Iterators.countfrom(1) + wait(_gc_event) + sleep(5) # throttle by 5 seconds, so the computation can finish + bytes1 = Base.gc_live_bytes() + Base.GC.gc(true) + bytes2 = Base.gc_live_bytes() + @debug "WORKER: gc retrieved $(round((bytes1-bytes2)/1024/1024))MB" + # ignore all events after the gc + @atomic _gc_event.set = false + end end -const _channel_cache = Dict{UInt64, AbstractChannel}() +const _auto_gc_event = Base.Event() +const _auto_gc_task = Threads.@spawn begin + # Notifying will activate the loop. Otherwise this task will just wait forever. + wait(_auto_gc_event) + for _i in Iterators.countfrom(1) + !_auto_gc_event.set && break + get(ENV, "MALT_AUTO_GC_SECONDS", "900") |> x -> parse(Int, x) |> sleep + bytes1 = Base.gc_live_bytes() + Base.GC.gc(true) + bytes2 = Base.gc_live_bytes() + @debug "WORKER: auto gc retrieved $(round((bytes1-bytes2)/1024/1024))MB" + # ignore all events after the gc + end +end if abspath(PROGRAM_FILE) == @__FILE__ main()