diff --git a/src/katipo.erl b/src/katipo.erl index a1351c4..9d7eb44 100644 --- a/src/katipo.erl +++ b/src/katipo.erl @@ -320,9 +320,18 @@ sslkey = undefined :: undefined | binary() | file:name_all(), sslkey_blob = undefined :: undefined | binary(), keypasswd = undefined :: undefined | binary(), - userpwd = undefined :: undefined | binary() + userpwd = undefined :: undefined | binary(), + async = false :: boolean(), + reply_to = undefined :: pid() | atom() }). +-record(reply_to, { + async = false :: boolean(), + from :: {pid(), any()} | pid(), + tref = undefined :: undefined | reference(), + response_ref = undefined :: undefined | reference() + }). + tcp_fastopen_available() -> ?TCP_FASTOPEN_AVAILABLE. @@ -399,12 +408,16 @@ req(PoolName, Opts) case process_opts(Opts) of {ok, #req{url=undefined}} -> {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; - {ok, Req} -> + {ok, Req=#req{async=Async, reply_to=ReplyTo}} -> Timeout = ?MODULE:get_timeout(Req), Req2 = Req#req{timeout=Timeout}, + Req3 = case {Async, ReplyTo} of + {true, undefined} -> Req2#req{reply_to=self()}; + {_, _} -> Req2 + end, Ts = os:timestamp(), {Result, {Response, Metrics}} = - wpool:call(PoolName, Req2, random_worker, infinity), + wpool:call(PoolName, Req3, random_worker, infinity), TotalUs = timer:now_diff(os:timestamp(), Ts), Metrics2 = katipo_metrics:notify({Result, Response}, Metrics, TotalUs), Response2 = maybe_return_metrics(Req2, Metrics2, Response), @@ -460,7 +473,9 @@ handle_call(#req{method = Method, sslkey = SSLKey, sslkey_blob = SSLKeyBlob, keypasswd = KeyPasswd, - userpwd = UserPwd}, + userpwd = UserPwd, + async = Async, + reply_to = PidOrAtom}, From, State=#state{port=Port, reqs=Reqs}) -> {Self, Ref} = From, @@ -490,9 +505,22 @@ handle_call(#req{method = Method, {?userpwd, UserPwd}], Command = {Self, Ref, Method, Url, Headers, CookieJar, Body, Opts}, true = port_command(Port, term_to_binary(Command)), - Tref = erlang:start_timer(Timeout, self(), {req_timeout, From}), - Reqs2 = maps:put(From, Tref, Reqs), - {noreply, State#state{reqs=Reqs2}}. + ReplyTo = + case Async of + false -> + Tref = erlang:start_timer(Timeout, self(), {req_timeout, From}), + #reply_to{async=Async, from=From, tref=Tref}; + true -> + #reply_to{async=Async, from=PidOrAtom, response_ref=make_ref()} + end, + Reqs2 = maps:put(From, ReplyTo, Reqs), + State2 = State#state{reqs=Reqs2}, + case Async of + false -> + {noreply, State2}; + true -> + {reply, {ok, ReplyTo#reply_to.response_ref}, State2} + end. handle_cast(Msg, State) -> error_logger:error_msg("Unexpected cast: ~p", [Msg]), @@ -512,9 +540,12 @@ handle_info({Port, {data, Data}}, State=#state{port=Port, reqs=Reqs}) -> {error, {From0, {Error, Metrics}}} end, case maps:find(From, Reqs) of - {ok, Tref} -> + {ok, #reply_to{async=false, from=From, tref=Tref}} -> _ = erlang:cancel_timer(Tref), _ = gen_server:reply(From, {Result, Response}); + {ok, #reply_to{async=true, from=PidOrAtom, response_ref=ResponseRef}} -> + PidOrAtom ! {katipo_response, ResponseRef, {Result, Response}}, + ok; error -> ok end, @@ -716,6 +747,10 @@ opt(keypasswd, Pass, {Req, Errors}) when is_binary(Pass) -> {Req#req{keypasswd=Pass}, Errors}; opt(userpwd, UserPwd, {Req, Errors}) when is_binary(UserPwd) -> {Req#req{userpwd=UserPwd}, Errors}; +opt(async, Async, {Req, Errors}) when is_boolean(Async) -> + {Req#req{async=Async}, Errors}; +opt(reply_to, PidOrAtom, {Req, Errors}) when is_pid(PidOrAtom) orelse is_atom(PidOrAtom) -> + {Req#req{reply_to=PidOrAtom}, Errors}; opt(K, V, {Req, Errors}) -> {Req, [{K, V} | Errors]}.