Skip to content

Commit

Permalink
Close Escalus TCP connection in a clean way
Browse files Browse the repository at this point in the history
* Wait for the stream end stanza.
* Fail if a TCP error occurs.
* Make connection opening and closing more symmetric.
* Move stopping clients out of the cleaner process
    in order to receive the stream end stanza.

Avoid opening unnecessary XMPP connections when creating users

Fix bosh and ws transports

Fix connection steps after changes

Remove obsolete fields from the client record

Fix bugs related to connection management

Add API for querying TCP connection parameters

Set JID for user without resource

Fix minor issues reported by dialyzer

Update client JID after the ‘bind’ step

Refactor escalus connection handling

- Add missing type specs
- Move tls and zlib setup out of the transport modules
  • Loading branch information
chrzaszcz authored and fenek committed Jun 8, 2017
1 parent 868afbb commit 63645fa
Show file tree
Hide file tree
Showing 10 changed files with 665 additions and 511 deletions.
7 changes: 2 additions & 5 deletions include/escalus.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
-record(client, {
jid :: binary() | undefined,
module :: atom(),
socket :: term(),
ssl :: boolean(),
compress :: {zlib, {Zin::zlib:zstream(), Zout::zlib:zstream()}}
| false,
rcv_pid :: pid(),
event_client :: any()
event_client :: any(),
props :: list()
}).
213 changes: 105 additions & 108 deletions src/escalus_bosh.erl

Large diffs are not rendered by default.

40 changes: 30 additions & 10 deletions src/escalus_cleaner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
% Public API
-export([start/1,
add_client/2,
clean/1,
remove_client/2,
get_clients/1,
stop/1]).

-behaviour(gen_server).
Expand All @@ -46,48 +47,67 @@
clients = [] :: [pid()]
}).

-type state() :: #state{}.

%%%===================================================================
%%% Public API
%%%===================================================================

-spec start(escalus:config()) -> escalus:config().
start(Config) ->
{ok, Pid} = gen_server:start_link(?MODULE, [], []),
[{escalus_cleaner, Pid} | Config].

-spec add_client(escalus:config(), escalus:client()) -> ok.
add_client(Config, Client) ->
gen_server:cast(get_cleaner(Config), {add_client, Client}).
gen_server:call(get_cleaner(Config), {add_client, Client}).

-spec remove_client(escalus:config(), escalus:client()) -> ok.
remove_client(Config, Client) ->
gen_server:call(get_cleaner(Config), {remove_client, Client}).

clean(Config) ->
gen_server:call(get_cleaner(Config), clean).
-spec get_clients(escalus:config()) -> [escalus:client()].
get_clients(Config) ->
gen_server:call(get_cleaner(Config), get_clients).

-spec stop(escalus:config()) -> ok.
stop(Config) ->
gen_server:cast(get_cleaner(Config), stop).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

-spec init(list()) -> {ok, state()}.
init([]) ->
{ok, #state{}}.

handle_call(clean, _From, #state{clients = Clients} = State) ->
lists:foreach(fun escalus_client:stop/1, Clients),
{reply, ok, State#state{clients = []}}.

handle_cast({add_client, Pid}, #state{clients = Clients} = State) ->
{noreply, State#state{clients = [Pid | Clients]}};
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}
| {noreply, state()}
| {stop, normal, ok, state()}.
handle_call({add_client, Client}, _From, #state{clients = Clients} = State) ->
{reply, ok, State#state{clients = [Client | Clients]}};
handle_call({remove_client, Client}, _From, #state{clients = Clients} = State) ->
{reply, ok, State#state{clients = Clients -- [Client]}};
handle_call(get_clients, _From, #state{clients = Clients} = State) ->
{reply, Clients, State}.

-spec handle_cast(term(), state()) -> {noreply, state()} | {stop, normal, state()}.
handle_cast(stop, State) ->
{stop, normal, State}.

-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(_Info, State) ->
{noreply, State}.

-spec terminate(term(), state()) -> term().
terminate(_Reason, #state{clients = []}) ->
ok;
terminate(_Reason, #state{clients = Clients}) ->
error_logger:warning_msg("cleaner finishes dirty: ~p~n", [Clients]),
ok.

-spec code_change(term(), state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

Expand Down
60 changes: 39 additions & 21 deletions src/escalus_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
start/3,
send/2,
send_and_wait/2,
stop/1,
stop/2,
wait_for_close/3,
kill_connection/2,
kill/1,
peek_stanzas/1, has_stanzas/1,
wait_for_stanzas/2, wait_for_stanzas/3,
Expand All @@ -36,8 +38,6 @@

-export_type([client/0]).

-import(escalus_compat, [bin/1, unimplemented/0]).

-define(WAIT_FOR_STANZA_TIMEOUT, 1000).

-include("escalus.hrl").
Expand All @@ -55,41 +55,59 @@ start(Config, UserSpec, Resource) ->
EventClient = escalus_event:new_client(Config, UserSpec, Resource),
Options = escalus_users:get_options(Config, UserSpec, Resource, EventClient),
case escalus_connection:start(Options) of
{ok, Conn, Props, _} ->
Jid = make_jid(Props),
Client = Conn#client{jid = Jid, event_client = EventClient},
{ok, Conn, _} ->
Client = Conn#client{event_client = EventClient},
escalus_cleaner:add_client(Config, Client),
{ok, Client};
{error, Error} ->
{error, Error}
end.

start_for(Config, Username, Resource) ->
-spec start_for(escalus:config(), escalus_users:user_spec(), binary()) -> {ok, _}
| {error, _}.
start_for(Config, UserSpec, Resource) ->
%% due to escalus_client:get_user_option hack,
%% those two are equivalent now
start(Config, Username, Resource).
start(Config, UserSpec, Resource).

-spec stop(escalus:config(), client()) -> ok.
stop(Config, Client) ->
escalus_connection:stop(Client),
escalus_cleaner:remove_client(Config, Client).

-spec kill_connection(escalus:config(), client()) -> ok.
kill_connection(Config, Client) ->
escalus_connection:kill(Client),
escalus_cleaner:remove_client(Config, Client).

stop(Conn) ->
escalus_connection:stop(Conn).
-spec wait_for_close(escalus:config(), client(), non_neg_integer()) -> ok.
wait_for_close(Config, Client, Timeout) ->
true = escalus_connection:wait_for_close(Client, Timeout),
escalus_cleaner:remove_client(Config, Client).

-spec kill(client()) -> term().
kill(#client{module = escalus_tcp, rcv_pid = Pid}) ->
erlang:exit(Pid, kill).

-spec peek_stanzas(client()) -> [exml:element()].
peek_stanzas(#client{rcv_pid = Pid}) ->
{messages, Msgs} = process_info(self(), messages),
lists:flatmap(fun ({stanza, #client{rcv_pid = StanzaPid}, Stanza}) when Pid == StanzaPid ->
lists:flatmap(fun ({stanza, StanzaPid, Stanza}) when Pid == StanzaPid ->
[Stanza];
%% FIXME: stream error
(_) ->
[]
end, Msgs).

-spec has_stanzas(client()) -> boolean().
has_stanzas(Client) ->
peek_stanzas(Client) /= [].

-spec wait_for_stanzas(client(), non_neg_integer()) -> [exml:element()].
wait_for_stanzas(Client, Count) ->
wait_for_stanzas(Client, Count, ?WAIT_FOR_STANZA_TIMEOUT).

-spec wait_for_stanzas(client(), non_neg_integer(), non_neg_integer()) -> [exml:element()].
wait_for_stanzas(Client, Count, Timeout) ->
Tref = erlang:send_after(Timeout, self(), TimeoutMsg={timeout, make_ref()}),
Result = do_wait_for_stanzas(Client, Count, TimeoutMsg, []),
Expand All @@ -101,7 +119,7 @@ do_wait_for_stanzas(_Client, 0, _TimeoutMsg, Acc) ->
do_wait_for_stanzas(#client{event_client=EventClient, jid=Jid, rcv_pid=Pid} = Client,
Count, TimeoutMsg, Acc) ->
receive
{stanza, #client{rcv_pid = Pid}, Stanza} ->
{stanza, Pid, Stanza} ->
escalus_event:pop_incoming_stanza(EventClient, Stanza),
escalus_ct:log_stanza(Jid, in, Stanza),
do_wait_for_stanzas(Client, Count - 1, TimeoutMsg, [Stanza|Acc]);
Expand All @@ -110,9 +128,11 @@ do_wait_for_stanzas(#client{event_client=EventClient, jid=Jid, rcv_pid=Pid} = Cl
do_wait_for_stanzas(Client, 0, TimeoutMsg, Acc)
end.

-spec wait_for_stanza(client()) -> exml:element().
wait_for_stanza(Client) ->
wait_for_stanza(Client, ?WAIT_FOR_STANZA_TIMEOUT).

-spec wait_for_stanza(client(), non_neg_integer()) -> exml:element().
wait_for_stanza(Client, Timeout) ->
case wait_for_stanzas(Client, 1, Timeout) of
[Stanza] ->
Expand All @@ -121,40 +141,38 @@ wait_for_stanza(Client, Timeout) ->
error(timeout_when_waiting_for_stanza, [Client, Timeout])
end.

-spec send(client(), exml:element()) -> ok.
send(Client, Packet) ->
escalus_connection:send(Client, Packet).

-spec send_and_wait(client(), exml:element()) -> exml:element().
send_and_wait(Client, Packet) ->
ok = send(Client, Packet),
wait_for_stanza(Client).

-spec is_client(term()) -> boolean().
is_client(#client{}) ->
true;
is_client(_) ->
false.

-spec full_jid(client()) -> binary() | undefined.
full_jid(#client{jid=Jid}) ->
Jid.

-spec short_jid(client()) -> binary().
short_jid(Client) ->
escalus_utils:regexp_get(full_jid(Client), <<"^([^/]*)">>).

-spec username(client()) -> binary().
username(Client) ->
escalus_utils:regexp_get(full_jid(Client), <<"^([^@]*)">>).

-spec server(client()) -> binary().
server(Client) ->
escalus_utils:regexp_get(full_jid(Client), <<"^[^@]*[@]([^/]*)">>).

-spec resource(client()) -> binary().
resource(Client) ->
escalus_utils:get_resource(full_jid(Client)).

%%--------------------------------------------------------------------
%% helpers
%%--------------------------------------------------------------------

make_jid(Proplist) ->
{username, U} = lists:keyfind(username, 1, Proplist),
{server, S} = lists:keyfind(server, 1, Proplist),
{resource, R} = lists:keyfind(resource, 1, Proplist),
<<U/binary, "@", S/binary, "/", R/binary>>.
Loading

0 comments on commit 63645fa

Please sign in to comment.