Skip to content

Commit

Permalink
Robust pool wrapper
Browse files Browse the repository at this point in the history
await_pool/teardown now make sense
  • Loading branch information
mtzguido committed Nov 10, 2024
1 parent 323073c commit d930988
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 23 deletions.
56 changes: 36 additions & 20 deletions qs/NuPool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,77 @@

module T = Domainslib.Task
module A = Atomic
module S = Semaphore.Binary
module CV = Condition
open Either

let dbg s = () (* print_string (s ^ "\n"); flush stdout *)
let dbg s = print_string (s ^ "\n"); flush stdout

type 'a task = 'a T.task

type !'a promise = 'a T.promise

type pool = {
p : T.pool;
ctr : int Atomic.t;
sem : S.t;
m : Mutex.t;
ctr : int ref;
cv : CV.t;
}

let setup_pool num_domains =
let p = T.setup_pool ~num_domains () in
let ctr = Atomic.make 0 in
let sem = S.make false in
{ p; ctr; sem }
let ctr = ref 0 in
let m = Mutex.create () in
let cv = CV.create () in
{ p; m; ctr; cv }

let teardown_pool p =
(* S.acquire p.sem; *)
T.teardown_pool p.p
let rec wait_for_empty p =
if !(p.ctr) <> 0 then (
CV.wait p.cv p.m;
wait_for_empty p
)

let wrap_task p (t : 'a task) : 'a task = fun () ->
let r =
match t () with
| exception ex -> Left ex
| v -> Right v
in
let c = A.fetch_and_add p.ctr (-1) in
if c = 1 then (* c == old value *)
S.release p.sem;
Mutex.lock p.m;
p.ctr := !(p.ctr) - 1;
if !(p.ctr) = 0 then
CV.broadcast p.cv;
Mutex.unlock p.m;
match r with
| Left ex -> raise ex (* this will mess with stack traces, but at least won't deadlock *)
| Right v -> v

let run p t =
ignore (A.fetch_and_add p.ctr 1);
T.run p.p (wrap_task p t)
let inc_ctr p =
Mutex.lock p.m;
p.ctr := !(p.ctr) + 1;
(* dbg ("counter now = " ^string_of_int !(p.ctr)); *)
Mutex.unlock p.m

let async p t =
ignore (A.fetch_and_add p.ctr 1);
inc_ctr p;
T.async p.p (wrap_task p t)

let await p h =
T.await p.p h

let await_pool p () () () =
dbg "await_pool.1";
S.acquire p.sem (* not right *) ;
dbg "await_pool.2";
(* dbg "await_pool.1"; *)
Mutex.lock p.m;
(* dbg "await_pool.2"; *)
wait_for_empty p;
(* dbg "await_pool.3"; *)
Mutex.unlock p.m;
()

let teardown_pool p =
Mutex.lock p.m;
wait_for_empty p;
T.teardown_pool p.p

let spawn_ p () () () f =
let _ = async p f in
()
6 changes: 3 additions & 3 deletions qs/driver.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ let dbg s = print_string (s ^ "\n"); flush stdout

let main =
let nproc = 32 in
let len = 100000000 in
let len = 1000000 in
let a = Array.make len 0 in
for i = 0 to len - 1 do
a.(i) <- Random.int 100000000
a.(i) <- Random.int (10 * len)
done;
(* print_string "BEFORE: "; *)
(* Array.iter (fun x -> print_int x; print_string " ") a; *)
(* print_newline (); *)
(* dbg "calling quicksort"; *)
dbg "calling quicksort";
Quicksort_Task.quicksort nproc a 0 len () () ();
(* print_string "AFTER: "; *)
(* Array.iter (fun x -> print_int x; print_string " ") a; *)
Expand Down

0 comments on commit d930988

Please sign in to comment.