Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement basic async IO #6505

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Std/Internal.lean
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Authors: Henrik Böving
-/
prelude
import Std.Internal.Parsec
import Std.Internal.UV

/-!
This directory is used for components of the standard library that are either considered
Expand Down
119 changes: 119 additions & 0 deletions src/Std/Internal/UV.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
prelude
import Init.System.IO
import Init.System.Promise

namespace Std
namespace Internal
namespace UV

namespace Loop

/--
Options for configuring the event loop behavior.
-/
structure Loop.Options where
/--
Accumulate the amount of idle time the event loop spends in the event provider.
-/
accumulateIdleTime : Bool := False

/--
Block a SIGPROF signal when polling for new events. It's commonly used for unnecessary wakeups
when using a sampling profiler.
-/
blockSigProfSignal : Bool := False

/--
Configures the event loop with the specified options.
-/
@[extern "lean_uv_event_loop_configure"]
opaque configure (options : Loop.Options) : BaseIO Unit

/--
Checks if the event loop is still active and processing events.
-/
@[extern "lean_uv_event_loop_alive"]
opaque alive : BaseIO Bool

end Loop

private opaque TimerImpl : NonemptyType.{0}

/--
`Timer`s are used to generate `IO.Promise`s that resolve after some time.

A `Timer` can be in one of 3 states:
- Right after construction it's initial.
- While it is ticking it's running.
- If it has stopped for some reason it's finished.

This together with whether it was set up as `repeating` with `Timer.new` determines the behavior
of all functions on `Timer`s.
-/
def Timer : Type := TimerImpl.type

instance : Nonempty Timer := TimerImpl.property

namespace Timer

/--
This creates a `Timer` in the initial state and doesn't run it yet.
- If `repeating` is `false` this constructs a timer that resolves once after `durationMs`
milliseconds, counting from when it's run.
- If `repeating` is `true` this constructs a timer that resolves after multiples of `durationMs`
milliseconds, counting from when it's run. Note that this includes the 0th multiple right after
starting the timer. Furthermore a repeating timer will only be freed after `Timer.stop` is called.
-/
@[extern "lean_uv_timer_mk"]
opaque mk (timeout : UInt64) (repeating : Bool) : IO Timer

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- if `repeating` is `false` and:
- it is initial, run it and return a new `IO.Promise` that is set to resolve once `durationMs`
milliseconds have elapsed. After this `IO.Promise` is resolved the `Timer` is finished.
- it is running or finished, return the same `IO.Promise` that the first call to `next` returned.
- if `repeating` is `true` and:
- it is initial, run it and return a new `IO.Promise` that resolves right away
(as it is the 0th multiple of `durationMs`).
- it is running, check whether the last returned `IO.Promise` is already resolved:
- If it is, return a new `IO.Promise` that resolves upon finishing the next cycle
- If it is not, return the last `IO.Promise`
This ensures that the returned `IO.Promise` resolves at the next repetition of the timer.
- if it is finished, return the last `IO.Promise` created by `next`. Notably this could be one
that never resolves if the timer was stopped before fulfilling the last one.
-/
@[extern "lean_uv_timer_next"]
opaque next (timer : @& Timer) : IO (IO.Promise Unit)

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running and `repeating` is `false` this will delay the resolution of the timer until
`durationMs` milliseconds after the call of this function.
- Delay the resolution of the next tick of the timer until `durationMs` milliseconds after the
call of this function, then continue normal ticking behavior from there.
-/
@[extern "lean_uv_timer_reset"]
opaque reset (timer : @& Timer) : IO Unit

/--
This function has different behavior depending on the state of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running the execution of the timer is stopped and it is put into the finished state.
Note that if the last `IO.Promise` generated by `next` is unresolved and being waited
on this creates a memory leak and the waiting task is not going to be awoken anymore.
-/
@[extern "lean_uv_timer_stop"]
opaque stop (timer : @& Timer) : IO Unit

end Timer

end UV
end Internal
end Std
2 changes: 1 addition & 1 deletion src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ set(RUNTIME_OBJS debug.cpp thread.cpp mpz.cpp utf8.cpp
object.cpp apply.cpp exception.cpp interrupt.cpp memory.cpp
stackinfo.cpp compact.cpp init_module.cpp load_dynlib.cpp io.cpp hash.cpp
platform.cpp alloc.cpp allocprof.cpp sharecommon.cpp stack_overflow.cpp
process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp)
process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/event_loop.cpp uv/timer.cpp)
add_library(leanrt_initial-exec STATIC ${RUNTIME_OBJS})
set_target_properties(leanrt_initial-exec PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/init_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Author: Leonardo de Moura
#include "runtime/process.h"
#include "runtime/mutex.h"
#include "runtime/init_module.h"
#include "runtime/libuv.h"

namespace lean {
extern "C" LEAN_EXPORT void lean_initialize_runtime_module() {
Expand All @@ -24,6 +25,7 @@ extern "C" LEAN_EXPORT void lean_initialize_runtime_module() {
initialize_mutex();
initialize_process();
initialize_stack_overflow();
initialize_libuv();
}
void initialize_runtime_module() {
lean_initialize_runtime_module();
Expand Down
19 changes: 17 additions & 2 deletions src/runtime/libuv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,36 @@
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.

Author: Markus Himmel
*/
Author: Markus Himmel, Sofia Rodrigues
*/
#include <pthread.h>
#include "runtime/libuv.h"
#include "runtime/object.h"

namespace lean {

#ifndef LEAN_EMSCRIPTEN
#include <uv.h>

extern "C" void initialize_libuv() {
initialize_libuv_timer();
initialize_libuv_loop();

lthread([]() { event_loop_run_loop(&global_ev); });
}

/* Lean.libUVVersionFn : Unit → Nat */
extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg o) {
return lean_unsigned_to_nat(uv_version());
}

#else

extern "C" void initialize_libuv() {}

extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg o) {
return lean_box(0);
}

#endif
}
22 changes: 21 additions & 1 deletion src/runtime/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,29 @@
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.

Author: Markus Himmel
Author: Markus Himmel, Sofia Rodrigues
*/
#pragma once
#include <lean/lean.h>
#include "runtime/uv/event_loop.h"
#include "runtime/uv/timer.h"
#include "runtime/alloc.h"
#include "runtime/io.h"
#include "runtime/utf8.h"
#include "runtime/object.h"
#include "runtime/thread.h"
#include "runtime/allocprof.h"
#include "runtime/object.h"

namespace lean {
#ifndef LEAN_EMSCRIPTEN
#include <uv.h>
#endif

extern "C" void initialize_libuv();

// =======================================
// General LibUV functions.
extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg);

}
5 changes: 5 additions & 0 deletions src/runtime/object.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ inline obj_res st_ref_set(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_re
inline obj_res st_ref_reset(b_obj_arg r, obj_arg w) { return lean_st_ref_reset(r, w); }
inline obj_res st_ref_swap(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_ref_swap(r, v, w); }


extern "C" LEAN_EXPORT obj_res lean_io_promise_new(obj_arg);
extern "C" LEAN_EXPORT obj_res lean_io_promise_resolve(obj_arg value, b_obj_arg promise, obj_arg);
extern "C" LEAN_EXPORT obj_res lean_io_promise_result(obj_arg promise);

// =======================================
// Module initialization/finalization
void initialize_object();
Expand Down
143 changes: 143 additions & 0 deletions src/runtime/uv/event_loop.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.

Author: Sofia Rodrigues, Henrik Böving
*/
#include "runtime/uv/event_loop.h"


/*
This file builds a thread safe event loop on top of the thread unsafe libuv event loop.
We achieve this by always having a `uv_async_t` associated with the libuv event loop.
As `uv_async_t` are a thread safe primitive it is safe to send a notification to it from another
thread. Once this notification arrives the event loop suspends its own execution and unlocks a mutex
that protects it. This mutex can then be taken by another thread that wants to work with the event
loop. After that work is done it signals a condition variable that the event loop is waiting on
to continue its execution.
*/

namespace lean {
#ifndef LEAN_EMSCRIPTEN
using namespace std;

event_loop_t global_ev;

// Utility function for error checking. This function is only used inside the
// initializition of the event loop.
static void check_uv(int result, const char * msg) {
if (result != 0) {
std::string err_message = std::string(msg) + ": " + uv_strerror(result);
lean_internal_panic(err_message.c_str());
}
}

// The callback that stops the loop when it's called.
void async_callback(uv_async_t * handle) {
uv_stop(handle->loop);
}

// Interrupts the event loop and stops it so it can receive future requests.
void event_loop_interrupt(event_loop_t * event_loop) {
int result = uv_async_send(&event_loop->async);
(void)result;
lean_assert(result == 0);
}

// Initializes the event loop
void event_loop_init(event_loop_t * event_loop) {
event_loop->loop = uv_default_loop();
check_uv(uv_mutex_init_recursive(&event_loop->mutex), "Failed to initialize mutex");
check_uv(uv_cond_init(&event_loop->cond_var), "Failed to initialize condition variable");
check_uv(uv_async_init(event_loop->loop, &event_loop->async, NULL), "Failed to initialize async");
event_loop->n_waiters = 0;
}

// Locks the event loop for the side of the requesters.
void event_loop_lock(event_loop_t * event_loop) {
if (uv_mutex_trylock(&event_loop->mutex) != 0) {
event_loop->n_waiters++;
event_loop_interrupt(event_loop);
uv_mutex_lock(&event_loop->mutex);
event_loop->n_waiters--;
}
}

// Unlock event loop
void event_loop_unlock(event_loop_t * event_loop) {
if (event_loop->n_waiters == 0) {
uv_cond_signal(&event_loop->cond_var);
}
uv_mutex_unlock(&event_loop->mutex);
}

// Runs the loop and stops when it needs to register new requests.
void event_loop_run_loop(event_loop_t * event_loop) {
while (uv_loop_alive(event_loop->loop)) {
uv_mutex_lock(&event_loop->mutex);

while (event_loop->n_waiters != 0) {
uv_cond_wait(&event_loop->cond_var, &event_loop->mutex);
}

uv_run(event_loop->loop, UV_RUN_ONCE);
/*
* We leave `uv_run` only when `uv_stop` is called as there is always the `uv_async_t` so
* we can never run out of things to wait on. `uv_stop` is only called from `async_callback`
* when another thread wants to work with the event loop so we need to give up the mutex.
*/

uv_mutex_unlock(&event_loop->mutex);
}
}

/* Std.Internal.UV.Loop.configure (options : Loop.Options) : BaseIO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_configure(b_obj_arg options, obj_arg /* w */ ) {
bool accum = lean_ctor_get_uint8(options, 0);
bool block = lean_ctor_get_uint8(options, 1);

event_loop_lock(&global_ev);

if (accum && uv_loop_configure(global_ev.loop, UV_METRICS_IDLE_TIME) != 0) {
return io_result_mk_error("failed to configure global_ev.loop with UV_METRICS_IDLE_TIME");
}

#if!defined(WIN32) && !defined(_WIN32)
if (block && uv_loop_configure(global_ev.loop, UV_LOOP_BLOCK_SIGNAL, SIGPROF) != 0) {
return io_result_mk_error("failed to configure global_ev.loop with UV_LOOP_BLOCK_SIGNAL");
}
#endif

event_loop_unlock(&global_ev);

return lean_io_result_mk_ok(lean_box(0));
}

/* Std.Internal.UV.Loop.alive : BaseIO UInt64 */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_alive(obj_arg /* w */ ) {
event_loop_lock(&global_ev);
int is_alive = uv_loop_alive(global_ev.loop);
event_loop_unlock(&global_ev);

return lean_io_result_mk_ok(lean_box(is_alive));
}

void initialize_libuv_loop() {
event_loop_init(&global_ev);
}

#else

/* Std.Internal.UV.Loop.configure (options : Loop.Options) : BaseIO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_configure(b_obj_arg options, obj_arg /* w */ ) {
return io_result_mk_error("lean_uv_event_loop_configure is not supported");
}

/* Std.Internal.UV.Loop.alive : BaseIO UInt64 */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_alive(obj_arg /* w */ ) {
return io_result_mk_error("lean_uv_event_loop_alive is not supported");
}

#endif

}
Loading
Loading