Skip to content

Commit

Permalink
Implement pre-forking on Unix (#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmattio authored Dec 8, 2020
1 parent 937a945 commit 29eb6f1
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 63 deletions.
60 changes: 11 additions & 49 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,53 +1,15 @@
*.cmt
*.cmti
*.o
*.cmo
*.cma
*.cmxs
*.opt
*.run
*.cmi
*.cmx
*.cmxa
*.annot
*.depends
*.cmxi
*.mllib
*.mldylib
*.mlpack
*.odocl
*.odocdir
*.docl
*.docdir
*.cmt
*.a

# ocamlbuild working directory
_build/
examples/_build/

# ocamlbuild targets
*.byte
*.native
myocamlbuild.ml
setup.data
setup.log
setup.ml
META
_tags
TAGS
*.omc
*.omakedb
.omakedb.lock
_tests

examples/auth_middleware
examples/exit_hook_example
examples/hello_world
examples/hello_world_basic
examples/hello_world_html
examples/middleware_ua
examples/read_json_body
examples/sample
examples/static_serve_override
examples/uppercase_middleware
lib_test/routes
*.merlin
# Merlin configuring file for Vim and Emacs
.merlin

# Dune generated files
*.install
_opam

# Local OPAM switch
_opam/
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- New `basic_auth` middleware to protect handlers with a `Basic` authentication method (#238)
- New `Response.of_file` API for conveniently creating a response of a file (#244)
- Add a package `opium-graphql` to easily create GraphQL server with Opium (#235)
- Add a function `App.run_multicore` that uses pre-forking and spawns multiple processes that will handle incoming requests (#239)

## Changed

Expand Down
2 changes: 1 addition & 1 deletion example/simple_middleware/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Simple Middleware

```
dune exec example/exit_hook/main.exe
dune exec example/simple_middleware/main.exe
```

This example shows how to implement a simple middleware. It implements an `Reject UA` middleware that rejects request if the User-Agent contains `"MSIE"`.
117 changes: 104 additions & 13 deletions opium/src/app.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let err_invalid_host host =
Lwt.fail_invalid_arg ("Could not get host info for `" ^ host ^ "`")
;;

let run_unix ?backlog ?middlewares ~host ~port handler =
let make_connection_handler ~host ~port ?middlewares handler =
let* host_entry =
Lwt.catch
(fun () -> Lwt_unix.gethostbyname host)
Expand All @@ -28,12 +28,50 @@ let run_unix ?backlog ?middlewares ~host ~port handler =
let app = Rock.App.create ?middlewares ~handler () in
Rock.Server_connection.run f app
in
Lwt.return (listen_address, connection_handler)
;;

let run_unix ?backlog ?middlewares ~host ~port handler =
let* listen_address, connection_handler =
make_connection_handler ?middlewares ~host ~port handler
in
Lwt_io.establish_server_with_client_socket ?backlog listen_address connection_handler
;;

let run_unix_multicore ?middlewares ~host ~port ~jobs handler =
let listen_address, connection_handler =
Lwt_main.run @@ make_connection_handler ?middlewares ~host ~port handler
in
let socket =
Lwt_unix.socket (Unix.domain_of_sockaddr listen_address) Unix.SOCK_STREAM 0
in
Lwt_unix.setsockopt socket Unix.SO_REUSEADDR true;
Lwt_main.run
(let+ () = Lwt_unix.bind socket listen_address in
Lwt_unix.listen socket (Lwt_unix.somaxconn () [@ocaml.warning "-3"]));
let rec accept_loop socket instance =
let* socket', sockaddr' = Lwt_unix.accept socket in
Lwt.async (fun () -> connection_handler sockaddr' socket');
accept_loop socket instance
in
for i = 1 to jobs do
flush_all ();
if Lwt_unix.fork () = 0
then (
Lwt.async (fun () -> accept_loop socket i);
let forever, _ = Lwt.wait () in
Lwt_main.run forever;
exit 0)
done;
while true do
Unix.pause ()
done
;;

type t =
{ host : string
; port : int
; jobs : int
; backlog : int option
; debug : bool
; verbose : bool
Expand All @@ -58,10 +96,23 @@ let default_not_found _ =
())
;;

let system_cores =
match Sys.unix with
| false ->
(* TODO: detect number of cores on Windows *)
1
| true ->
let ic = Unix.open_process_in "getconf _NPROCESSORS_ONLN" in
let cores = int_of_string (input_line ic) in
ignore (Unix.close_process_in ic);
cores
;;

let empty =
{ name = "Opium Default Name"
; host = "0.0.0.0"
; port = 3000
; jobs = system_cores
; backlog = None
; debug = false
; verbose = false
Expand Down Expand Up @@ -96,6 +147,7 @@ let to_handler app =
;;

let port port t = { t with port }
let jobs jobs t = { t with jobs }
let backlog backlog t = { t with backlog = Some backlog }
let host host t = { t with host }
let cmd_name name t = { t with name }
Expand Down Expand Up @@ -137,24 +189,48 @@ let any methods route action t =

let all = any [ `GET; `POST; `DELETE; `PUT; `HEAD; `OPTIONS ]

let start app =
(* We initialize the middlewares first, because the logger middleware initializes the
logger. *)
let middlewares = attach_middleware app in
let setup_logger app =
if app.verbose
then (
Logs.set_reporter (Logs_fmt.reporter ());
Logs.set_level (Some Logs.Info));
if app.debug then Logs.set_level (Some Logs.Debug);
if app.debug then Logs.set_level (Some Logs.Debug)
;;

let start app =
(* We initialize the middlewares first, because the logger middleware initializes the
logger. *)
let middlewares = attach_middleware app in
setup_logger app;
Logs.info (fun f ->
f
"Starting Opium on %s:%d%s..."
"Starting Opium on %s:%d%s"
app.host
app.port
(if app.debug then " (debug)" else ""));
(if app.debug then " (debug mode)" else ""));
run_unix ?backlog:app.backlog ~middlewares ~host:app.host ~port:app.port app.not_found
;;

let start_multicore app =
(* We initialize the middlewares first, because the logger middleware initializes the
logger. *)
let middlewares = attach_middleware app in
setup_logger app;
Logs.info (fun f ->
f
"Starting Opium on %s:%d with %d cores%s"
app.host
app.port
app.jobs
(if app.debug then " (debug mode)" else ""));
run_unix_multicore
~middlewares
~host:app.host
~port:app.port
~jobs:app.jobs
app.not_found
;;

let hashtbl_add_multi tbl x y =
let l =
try Hashtbl.find tbl x with
Expand Down Expand Up @@ -185,8 +261,8 @@ let print_middleware_f middlewares =
|> List.iter ~f:(Printf.printf "> %s \n")
;;

let cmd_run app port host print_routes print_middleware debug verbose _errors =
let app = { app with debug; verbose; host; port } in
let setup_app app port jobs host print_routes print_middleware debug verbose _errors =
let app = { app with debug; verbose; host; port; jobs } in
if print_routes
then (
let routes = app.routes in
Expand All @@ -197,7 +273,7 @@ let cmd_run app port host print_routes print_middleware debug verbose _errors =
let middlewares = app.middlewares in
print_middleware_f middlewares;
exit 0);
app |> start
app
;;

module Cmds = struct
Expand All @@ -218,6 +294,11 @@ module Cmds = struct
Arg.(value & opt int default & info [ "p"; "port" ] ~doc)
;;

let jobs default =
let doc = "jobs" in
Arg.(value & opt int default & info [ "j"; "jobs" ] ~doc)
;;

let host default =
let doc = "host" in
Arg.(value & opt string default & info [ "h"; "host" ] ~doc)
Expand All @@ -241,9 +322,10 @@ module Cmds = struct
let term =
let open Cmdliner.Term in
fun app ->
pure cmd_run
pure setup_app
$ pure app
$ port app.port
$ jobs app.jobs
$ host app.host
$ routes
$ middleware
Expand All @@ -265,7 +347,7 @@ let run_command' app =
match Term.eval (cmd, Cmds.info app.name) with
| `Ok a ->
Lwt.async (fun () ->
let* _server = a in
let* _server = start a in
Lwt.return_unit);
let forever, _ = Lwt.wait () in
`Ok forever
Expand All @@ -284,3 +366,12 @@ let run_command app =
| `Error -> exit 1
| `Not_running -> exit 0
;;

let run_multicore app =
let open Cmdliner in
let cmd = Cmds.term app in
match Term.eval (cmd, Cmds.info app.name) with
| `Ok a -> start_multicore a
| `Error _ -> exit 1
| _ -> exit 0
;;
7 changes: 7 additions & 0 deletions opium/src/app.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ val host : string -> builder
val backlog : int -> builder

val port : int -> builder
val jobs : int -> builder
val cmd_name : string -> builder

(** [not_found] accepts a regular Opium handler that will be used instead of the default
Expand Down Expand Up @@ -57,10 +58,16 @@ val middleware : Rock.Middleware.t -> builder
(** Start an opium server. The thread returned can be cancelled to shutdown the server *)
val start : t -> Lwt_io.server Lwt.t

(** Start an opium server with multiple processes. *)
val start_multicore : t -> unit

(** Create a cmdliner command from an app and run lwt's event loop *)
val run_command : t -> unit

(* Run a cmdliner command from an app. Does not launch Lwt's event loop. `Error is
returned if the command line arguments are incorrect. `Not_running is returned if the
command was completed without the server being launched *)
val run_command' : t -> [> `Ok of unit Lwt.t | `Error | `Not_running ]

(** Create a cmdliner command from an app and spawn with multiple processes. *)
val run_multicore : t -> unit

0 comments on commit 29eb6f1

Please sign in to comment.