From 29eb6f1e4c01a2f8e98ba0ec0c1581b888ebc685 Mon Sep 17 00:00:00 2001 From: Thibaut Mattio Date: Tue, 8 Dec 2020 23:12:51 +0100 Subject: [PATCH] Implement pre-forking on Unix (#239) --- .gitignore | 60 +++----------- CHANGES.md | 1 + example/simple_middleware/README.md | 2 +- opium/src/app.ml | 117 ++++++++++++++++++++++++---- opium/src/app.mli | 7 ++ 5 files changed, 124 insertions(+), 63 deletions(-) diff --git a/.gitignore b/.gitignore index 2a052037..677885b5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,53 +1,15 @@ -*.cmt -*.cmti -*.o -*.cmo -*.cma -*.cmxs -*.opt -*.run -*.cmi -*.cmx -*.cmxa -*.annot -*.depends -*.cmxi -*.mllib -*.mldylib -*.mlpack -*.odocl -*.odocdir -*.docl -*.docdir -*.cmt -*.a - +# ocamlbuild working directory _build/ -examples/_build/ + +# ocamlbuild targets +*.byte *.native -myocamlbuild.ml -setup.data -setup.log -setup.ml -META -_tags -TAGS -*.omc -*.omakedb -.omakedb.lock -_tests -examples/auth_middleware -examples/exit_hook_example -examples/hello_world -examples/hello_world_basic -examples/hello_world_html -examples/middleware_ua -examples/read_json_body -examples/sample -examples/static_serve_override -examples/uppercase_middleware -lib_test/routes -*.merlin +# Merlin configuring file for Vim and Emacs +.merlin + +# Dune generated files *.install -_opam + +# Local OPAM switch +_opam/ \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index 6a8e628d..83b6c155 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ - New `basic_auth` middleware to protect handlers with a `Basic` authentication method (#238) - New `Response.of_file` API for conveniently creating a response of a file (#244) - Add a package `opium-graphql` to easily create GraphQL server with Opium (#235) +- Add a function `App.run_multicore` that uses pre-forking and spawns multiple processes that will handle incoming requests (#239) ## Changed diff --git a/example/simple_middleware/README.md b/example/simple_middleware/README.md index b424d0e4..d49ee612 100644 --- a/example/simple_middleware/README.md +++ b/example/simple_middleware/README.md @@ -1,7 +1,7 @@ # Simple Middleware ``` -dune exec example/exit_hook/main.exe +dune exec example/simple_middleware/main.exe ``` This example shows how to implement a simple middleware. It implements an `Reject UA` middleware that rejects request if the User-Agent contains `"MSIE"`. diff --git a/opium/src/app.ml b/opium/src/app.ml index b29dab3a..c66552b3 100644 --- a/opium/src/app.ml +++ b/opium/src/app.ml @@ -7,7 +7,7 @@ let err_invalid_host host = Lwt.fail_invalid_arg ("Could not get host info for `" ^ host ^ "`") ;; -let run_unix ?backlog ?middlewares ~host ~port handler = +let make_connection_handler ~host ~port ?middlewares handler = let* host_entry = Lwt.catch (fun () -> Lwt_unix.gethostbyname host) @@ -28,12 +28,50 @@ let run_unix ?backlog ?middlewares ~host ~port handler = let app = Rock.App.create ?middlewares ~handler () in Rock.Server_connection.run f app in + Lwt.return (listen_address, connection_handler) +;; + +let run_unix ?backlog ?middlewares ~host ~port handler = + let* listen_address, connection_handler = + make_connection_handler ?middlewares ~host ~port handler + in Lwt_io.establish_server_with_client_socket ?backlog listen_address connection_handler ;; +let run_unix_multicore ?middlewares ~host ~port ~jobs handler = + let listen_address, connection_handler = + Lwt_main.run @@ make_connection_handler ?middlewares ~host ~port handler + in + let socket = + Lwt_unix.socket (Unix.domain_of_sockaddr listen_address) Unix.SOCK_STREAM 0 + in + Lwt_unix.setsockopt socket Unix.SO_REUSEADDR true; + Lwt_main.run + (let+ () = Lwt_unix.bind socket listen_address in + Lwt_unix.listen socket (Lwt_unix.somaxconn () [@ocaml.warning "-3"])); + let rec accept_loop socket instance = + let* socket', sockaddr' = Lwt_unix.accept socket in + Lwt.async (fun () -> connection_handler sockaddr' socket'); + accept_loop socket instance + in + for i = 1 to jobs do + flush_all (); + if Lwt_unix.fork () = 0 + then ( + Lwt.async (fun () -> accept_loop socket i); + let forever, _ = Lwt.wait () in + Lwt_main.run forever; + exit 0) + done; + while true do + Unix.pause () + done +;; + type t = { host : string ; port : int + ; jobs : int ; backlog : int option ; debug : bool ; verbose : bool @@ -58,10 +96,23 @@ let default_not_found _ = ()) ;; +let system_cores = + match Sys.unix with + | false -> + (* TODO: detect number of cores on Windows *) + 1 + | true -> + let ic = Unix.open_process_in "getconf _NPROCESSORS_ONLN" in + let cores = int_of_string (input_line ic) in + ignore (Unix.close_process_in ic); + cores +;; + let empty = { name = "Opium Default Name" ; host = "0.0.0.0" ; port = 3000 + ; jobs = system_cores ; backlog = None ; debug = false ; verbose = false @@ -96,6 +147,7 @@ let to_handler app = ;; let port port t = { t with port } +let jobs jobs t = { t with jobs } let backlog backlog t = { t with backlog = Some backlog } let host host t = { t with host } let cmd_name name t = { t with name } @@ -137,24 +189,48 @@ let any methods route action t = let all = any [ `GET; `POST; `DELETE; `PUT; `HEAD; `OPTIONS ] -let start app = - (* We initialize the middlewares first, because the logger middleware initializes the - logger. *) - let middlewares = attach_middleware app in +let setup_logger app = if app.verbose then ( Logs.set_reporter (Logs_fmt.reporter ()); Logs.set_level (Some Logs.Info)); - if app.debug then Logs.set_level (Some Logs.Debug); + if app.debug then Logs.set_level (Some Logs.Debug) +;; + +let start app = + (* We initialize the middlewares first, because the logger middleware initializes the + logger. *) + let middlewares = attach_middleware app in + setup_logger app; Logs.info (fun f -> f - "Starting Opium on %s:%d%s..." + "Starting Opium on %s:%d%s" app.host app.port - (if app.debug then " (debug)" else "")); + (if app.debug then " (debug mode)" else "")); run_unix ?backlog:app.backlog ~middlewares ~host:app.host ~port:app.port app.not_found ;; +let start_multicore app = + (* We initialize the middlewares first, because the logger middleware initializes the + logger. *) + let middlewares = attach_middleware app in + setup_logger app; + Logs.info (fun f -> + f + "Starting Opium on %s:%d with %d cores%s" + app.host + app.port + app.jobs + (if app.debug then " (debug mode)" else "")); + run_unix_multicore + ~middlewares + ~host:app.host + ~port:app.port + ~jobs:app.jobs + app.not_found +;; + let hashtbl_add_multi tbl x y = let l = try Hashtbl.find tbl x with @@ -185,8 +261,8 @@ let print_middleware_f middlewares = |> List.iter ~f:(Printf.printf "> %s \n") ;; -let cmd_run app port host print_routes print_middleware debug verbose _errors = - let app = { app with debug; verbose; host; port } in +let setup_app app port jobs host print_routes print_middleware debug verbose _errors = + let app = { app with debug; verbose; host; port; jobs } in if print_routes then ( let routes = app.routes in @@ -197,7 +273,7 @@ let cmd_run app port host print_routes print_middleware debug verbose _errors = let middlewares = app.middlewares in print_middleware_f middlewares; exit 0); - app |> start + app ;; module Cmds = struct @@ -218,6 +294,11 @@ module Cmds = struct Arg.(value & opt int default & info [ "p"; "port" ] ~doc) ;; + let jobs default = + let doc = "jobs" in + Arg.(value & opt int default & info [ "j"; "jobs" ] ~doc) + ;; + let host default = let doc = "host" in Arg.(value & opt string default & info [ "h"; "host" ] ~doc) @@ -241,9 +322,10 @@ module Cmds = struct let term = let open Cmdliner.Term in fun app -> - pure cmd_run + pure setup_app $ pure app $ port app.port + $ jobs app.jobs $ host app.host $ routes $ middleware @@ -265,7 +347,7 @@ let run_command' app = match Term.eval (cmd, Cmds.info app.name) with | `Ok a -> Lwt.async (fun () -> - let* _server = a in + let* _server = start a in Lwt.return_unit); let forever, _ = Lwt.wait () in `Ok forever @@ -284,3 +366,12 @@ let run_command app = | `Error -> exit 1 | `Not_running -> exit 0 ;; + +let run_multicore app = + let open Cmdliner in + let cmd = Cmds.term app in + match Term.eval (cmd, Cmds.info app.name) with + | `Ok a -> start_multicore a + | `Error _ -> exit 1 + | _ -> exit 0 +;; diff --git a/opium/src/app.mli b/opium/src/app.mli index f83f88d7..a8687c81 100644 --- a/opium/src/app.mli +++ b/opium/src/app.mli @@ -25,6 +25,7 @@ val host : string -> builder val backlog : int -> builder val port : int -> builder +val jobs : int -> builder val cmd_name : string -> builder (** [not_found] accepts a regular Opium handler that will be used instead of the default @@ -57,6 +58,9 @@ val middleware : Rock.Middleware.t -> builder (** Start an opium server. The thread returned can be cancelled to shutdown the server *) val start : t -> Lwt_io.server Lwt.t +(** Start an opium server with multiple processes. *) +val start_multicore : t -> unit + (** Create a cmdliner command from an app and run lwt's event loop *) val run_command : t -> unit @@ -64,3 +68,6 @@ val run_command : t -> unit returned if the command line arguments are incorrect. `Not_running is returned if the command was completed without the server being launched *) val run_command' : t -> [> `Ok of unit Lwt.t | `Error | `Not_running ] + +(** Create a cmdliner command from an app and spawn with multiple processes. *) +val run_multicore : t -> unit