Skip to content

Commit

Permalink
logproto: fixed state handling of the new internal handshake_in_progr…
Browse files Browse the repository at this point in the history
…ess flag

There were multiple issues here
- proto change did not force re-handshake if needed
- initial state handling led to a drop of the first flush event both on client and server side that e.g. led to incorrect persist state processing at restart
Signed-off-by: Hofi <[email protected]>
  • Loading branch information
HofiOne committed Nov 25, 2024
1 parent e52373d commit 61b2abb
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 14 deletions.
13 changes: 9 additions & 4 deletions lib/logproto/logproto-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,18 @@ log_proto_client_validate_options(LogProtoClient *self)
return self->validate_options(self);
}

static inline gboolean
log_proto_client_needs_handshake(LogProtoClient *s)
{
return s->handshake != NULL;
}

static inline LogProtoStatus
log_proto_client_handshake(LogProtoClient *s, gboolean *handshake_finished)
{
if (s->handshake)
{
return s->handshake(s, handshake_finished);
}
if (log_proto_client_needs_handshake(s))
return s->handshake(s, handshake_finished);

*handshake_finished = TRUE;
return LPS_SUCCESS;
}
Expand Down
13 changes: 9 additions & 4 deletions lib/logproto/logproto-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ log_proto_server_validate_options(LogProtoServer *self)
return self->validate_options(self);
}

static inline gboolean
log_proto_server_needs_handshake(LogProtoServer *s)
{
return s->handshake != NULL;
}

static inline LogProtoStatus
log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished)
{
if (s->handshake)
{
return s->handshake(s, handshake_finished);
}
if (log_proto_server_needs_handshake(s))
return s->handshake(s, handshake_finished);

*handshake_finished = TRUE;
return LPS_SUCCESS;
}
Expand Down
15 changes: 11 additions & 4 deletions lib/logreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,12 @@ log_reader_apply_proto_and_poll_events(LogReader *self, LogProtoServer *proto, P
self->proto = proto;

if (self->proto)
log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self);
{
log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self);
self->handshake_in_progress = log_proto_server_needs_handshake(self->proto);
}
else
self->handshake_in_progress = FALSE;

self->poll_events = poll_events;
}
Expand Down Expand Up @@ -501,11 +506,13 @@ log_reader_fetch_log(LogReader *self)

if ((self->options->flags & LR_IGNORE_AUX_DATA))
aux = NULL;

log_transport_aux_data_init(aux);

if (self->handshake_in_progress)
{
return log_reader_process_handshake(self);
gboolean succ = log_reader_process_handshake(self);
if (FALSE == succ || self->handshake_in_progress)
return FALSE;
}

/* NOTE: this loop is here to decrease the load on the main loop, we try
Expand Down Expand Up @@ -777,7 +784,7 @@ log_reader_new(GlobalConfig *cfg)
self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
self->super.metrics.raw_bytes_enabled = TRUE;
self->immediate_check = FALSE;
self->handshake_in_progress = TRUE;
self->handshake_in_progress = FALSE;
log_reader_init_watches(self);
g_mutex_init(&self->pending_close_lock);
g_cond_init(&self->pending_close_cond);
Expand Down
12 changes: 10 additions & 2 deletions lib/logwriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,11 @@ log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode)
return FALSE;

if (self->handshake_in_progress)
return log_writer_process_handshake(self);
{
gboolean succ = log_writer_process_handshake(self);
if (FALSE == succ || self->handshake_in_progress)
return FALSE;
}

/* NOTE: in case we're reloading or exiting we flush all queued items as
* long as the destination can consume it. This is not going to be an
Expand Down Expand Up @@ -1720,7 +1724,11 @@ log_writer_set_proto(LogWriter *self, LogProtoClient *proto)

log_proto_client_set_client_flow_control(self->proto, &flow_control_funcs);
log_proto_client_set_options(self->proto, &self->options->proto_options.super);

self->handshake_in_progress = log_proto_client_needs_handshake(self->proto);
}
else
self->handshake_in_progress = FALSE;
}

static void
Expand Down Expand Up @@ -1923,7 +1931,7 @@ log_writer_new(guint32 flags, GlobalConfig *cfg)
self->flags = flags;
self->line_buffer = g_string_sized_new(128);
self->pollable_state = -1;
self->handshake_in_progress = TRUE;
self->handshake_in_progress = FALSE;
init_sequence_number(&self->seq_num);

log_writer_init_watches(self);
Expand Down

0 comments on commit 61b2abb

Please sign in to comment.