Skip to content

Commit e557f38

Browse files
committed
Fix issue #7: avoid spinning on send to unconnected socket
Prevent a TCP socket that's not actually connected from causing the driver enm_read_output function from trying repeatedly to send data and failing. Close the socket and indicate connection eof to the caller. Add a regression test for this case.
1 parent 7c319af commit e557f38

File tree

2 files changed

+48
-14
lines changed

2 files changed

+48
-14
lines changed

c_src/enm_drv.c

+23-13
Original file line numberDiff line numberDiff line change
@@ -232,18 +232,18 @@ enm_stop(ErlDrvData drv_data)
232232
}
233233

234234
static int
235-
enm_do_send(EnmData* d, const struct nn_msghdr* msghdr)
235+
enm_do_send(EnmData* d, const struct nn_msghdr* msghdr, int* err)
236236
{
237-
int rc, err;
237+
int rc;
238238

239239
if (msghdr->msg_iovlen == 0)
240240
return 0;
241-
err = 0;
241+
*err = 0;
242242
do {
243243
rc = nn_sendmsg(d->fd, msghdr, NN_DONTWAIT);
244244
if (rc < 0) {
245-
err = errno;
246-
switch (err) {
245+
*err = errno;
246+
switch (*err) {
247247
case EINTR:
248248
case EFSM:
249249
/* do nothing */
@@ -254,10 +254,10 @@ enm_do_send(EnmData* d, const struct nn_msghdr* msghdr)
254254
default:
255255
enm_write_select(d, 0);
256256
enm_read_select(d, 0);
257-
driver_failure(d->port, err);
257+
driver_failure(d->port, *err);
258258
}
259259
}
260-
} while (err == EINTR);
260+
} while (*err == EINTR);
261261
return rc;
262262
}
263263

@@ -280,9 +280,8 @@ enm_outputv(ErlDrvData drv_data, ErlIOVec *ev)
280280
msghdr.msg_control = 0;
281281
err = 0;
282282
do {
283-
rc = enm_do_send(d, &msghdr);
283+
rc = enm_do_send(d, &msghdr, &err);
284284
if (rc < 0) {
285-
err = errno;
286285
if (err == EAGAIN) {
287286
d->b.writable = 0;
288287
break;
@@ -304,9 +303,8 @@ enm_outputv(ErlDrvData drv_data, ErlIOVec *ev)
304303
msghdr.msg_control = 0;
305304
err = 0;
306305
do {
307-
rc = enm_do_send(d, &msghdr);
306+
rc = enm_do_send(d, &msghdr, &err);
308307
if (rc < 0) {
309-
err = errno;
310308
if (err == EAGAIN) {
311309
d->b.writable = 0;
312310
break;
@@ -817,7 +815,7 @@ enm_ready_output(ErlDrvData drv_data, ErlDrvEvent event)
817815
struct nn_msghdr msghdr;
818816
ErlIOVec ev;
819817
ErlDrvSizeT total;
820-
int rc;
818+
int rc, err;
821819

822820
d->b.writable = 1;
823821
total = driver_peekqv(d->port, &ev);
@@ -827,7 +825,19 @@ enm_ready_output(ErlDrvData drv_data, ErlDrvEvent event)
827825
msghdr.msg_iov = (struct nn_iovec*)ev.iov;
828826
msghdr.msg_iovlen = ev.vsize;
829827
msghdr.msg_control = 0;
830-
rc = enm_do_send(d, &msghdr);
828+
rc = enm_do_send(d, &msghdr, &err);
829+
if (rc < 0 && err == EAGAIN) {
830+
/* enm_ready_output (this function) is called when the VM's
831+
select/poll/etc. indicates the socket has become writable, but
832+
if we get here, it means we tried to write but it failed with
833+
EAGAIN, indicating that the socket isn't writable after all. So
834+
we assume things are broken in this case, and signal eof. */
835+
enm_write_select(d, 0);
836+
enm_read_select(d, 0);
837+
nn_close(d->fd);
838+
d->fd = d->sfd = d->rfd = -1;
839+
driver_failure_eof(d->port);
840+
}
831841
if (rc > 0)
832842
driver_deq(d->port, rc);
833843
if (rc == total && d->b.writable) {

test/enm_pipeline.erl

+25-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ pipeline_test_() ->
3232
fun enm:start_link/0,
3333
fun(_) -> enm:stop() end,
3434
[fun fan_out/0,
35-
fun fan_in/0]}.
35+
fun fan_in/0,
36+
fun no_server/0]}.
3637

3738
fan_out() ->
3839
{ok, Push1} = enm:push(),
@@ -144,3 +145,26 @@ do_pull([_|Pulls], Data, Acc) ->
144145
error(pull_timeout)
145146
end.
146147

148+
no_server() ->
149+
%% This test checks that attempts to push to a TCP endpoint that's not
150+
%% there fails (github enm issue #7). The test requires the remote
151+
%% endpoint to not have anything listening and accepting, so the
152+
%% following fold attempts to find such an endpoint. The test won't
153+
%% work if something actually answers at the endpoint the fold chooses.
154+
Port = (catch lists:foldl(
155+
fun(P, _) ->
156+
case gen_tcp:connect("localhost", P, []) of
157+
{error, econnrefused} ->
158+
throw(P);
159+
{ok, S} ->
160+
gen_tcp:close(S),
161+
P;
162+
_ ->
163+
P
164+
end
165+
end, ok, lists:seq(47000,60000))),
166+
Url = "tcp://localhost:"++integer_to_list(Port),
167+
{ok,Push} = enm:push([{connect,Url},list]),
168+
enm:send(Push, "sending the first message"),
169+
?assertMatch({error,closed}, enm:send(Push, "sending the second message")),
170+
ok.

0 commit comments

Comments
 (0)