From 19f1b12d86269bbe32708a24d5cb4492074389ff Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 4 Feb 2024 15:28:52 +0100 Subject: [PATCH 01/13] libtest: fixed LogTransportMock to not return bytes if the buffer size is 0 Signed-off-by: Balazs Scheidler --- libtest/mock-transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libtest/mock-transport.c b/libtest/mock-transport.c index 8ad2929145..f283a4893c 100644 --- a/libtest/mock-transport.c +++ b/libtest/mock-transport.c @@ -211,7 +211,7 @@ log_transport_mock_read_method(LogTransport *s, gpointer buf, gsize count, LogTr switch (g_array_index(self->value, data_t, self->current_value_ndx).type) { case DATA_STRING: - if (self->input_is_a_stream) + if (self->input_is_a_stream && count > 0) count = 1; current_iov = &g_array_index(self->value, data_t, self->current_value_ndx).iov; From 0d59a96192d9238237fcbe70db01a16b657770dd Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sat, 3 Feb 2024 15:47:34 +0100 Subject: [PATCH 02/13] logtransport: add read_ahead() method Signed-off-by: Balazs Scheidler --- lib/transport/logtransport.c | 93 ++++++++++++++++ lib/transport/logtransport.h | 21 +++- lib/transport/tests/Makefile.am | 7 ++ lib/transport/tests/test_transport.c | 160 +++++++++++++++++++++++++++ 4 files changed, 279 insertions(+), 2 deletions(-) create mode 100644 lib/transport/tests/test_transport.c diff --git a/lib/transport/logtransport.c b/lib/transport/logtransport.c index 61fd5f11b4..6761214312 100644 --- a/lib/transport/logtransport.c +++ b/lib/transport/logtransport.c @@ -27,6 +27,99 @@ #include +gssize +_log_transport_combined_read_with_read_ahead(LogTransport *self, + gpointer buf, gsize count, + LogTransportAuxData *aux) +{ + gsize ra_left = self->ra.buf_len - self->ra.pos; + gsize ra_count = count <= ra_left ? count : ra_left; + + if (ra_count > 0) + { + /* prepend data from read ahead buffer */ + memcpy(buf, &self->ra.buf[self->ra.pos], ra_count); + self->ra.pos += ra_count; + if (self->ra.pos < self->ra.buf_len) + { + return ra_count; + } + } + else + { + self->ra.buf_len = self->ra.pos = 0; + errno = EAGAIN; + return -1; + } + + buf = ((gchar *) buf) + ra_count; + count -= ra_count; + + if (count > 0) + { + /* need to read more */ + gssize rc = self->read(self, buf, count, aux); + if (rc < 0) + { + if (errno == EAGAIN) + return ra_count; + /* error, we put the bytes back to our read_ahead.buf */ + self->ra.pos -= ra_count; + return rc; + } + else + return rc + ra_count; + } + else + return ra_count; +} + + +/* NOTE: this would repeat the entire read operation if you invoke it + * multiple times. The maximum size of read_ahead is limited by the size of + * self->ra.buf[] + */ +gssize +log_transport_read_ahead(LogTransport *self, gpointer buf, gsize buflen, gboolean *moved_forward) +{ + gsize buffer_space = MIN(buflen, sizeof(self->ra.buf)); + gsize count = buffer_space > self->ra.buf_len ? buffer_space - self->ra.buf_len : 0; + gint rc = 0; + + g_assert(buflen <= sizeof(self->ra.buf)); + + /* read at the end of the read_ahead buffer */ + if (count > 0) + { + rc = self->read(self, + &self->ra.buf[self->ra.buf_len], + count, + NULL); + + if (rc < 0) + { + if (moved_forward) + *moved_forward = FALSE; + return rc; + } + } + + if (moved_forward) + *moved_forward = rc > 0; + + self->ra.buf_len += rc; + + if (self->ra.buf_len > 0) + { + rc = MIN(self->ra.buf_len, buflen); + memcpy(buf, self->ra.buf, rc); + return rc; + } + + return 0; +} + + void log_transport_free_method(LogTransport *s) { diff --git a/lib/transport/logtransport.h b/lib/transport/logtransport.h index c6d7e00f32..27f143bac0 100644 --- a/lib/transport/logtransport.h +++ b/lib/transport/logtransport.h @@ -34,11 +34,19 @@ struct _LogTransport { gint fd; GIOCondition cond; - const gchar *name; + gssize (*read)(LogTransport *self, gpointer buf, gsize count, LogTransportAuxData *aux); gssize (*write)(LogTransport *self, const gpointer buf, gsize count); gssize (*writev)(LogTransport *self, struct iovec *iov, gint iov_count); void (*free_fn)(LogTransport *self); + /* read ahead */ + struct + { + gchar buf[16]; + gint buf_len; + gint pos; + } ra; + const gchar *name; }; static inline gssize @@ -53,12 +61,21 @@ log_transport_writev(LogTransport *self, struct iovec *iov, gint iov_count) return self->writev(self, iov, iov_count); } +gssize _log_transport_combined_read_with_read_ahead(LogTransport *self, + gpointer buf, gsize count, + LogTransportAuxData *aux); + static inline gssize log_transport_read(LogTransport *self, gpointer buf, gsize count, LogTransportAuxData *aux) { - return self->read(self, buf, count, aux); + if (G_LIKELY(self->ra.buf_len == 0)) + return self->read(self, buf, count, aux); + + return _log_transport_combined_read_with_read_ahead(self, buf, count, aux); } +gssize log_transport_read_ahead(LogTransport *self, gpointer buf, gsize count, gboolean *moved_forward); + void log_transport_init_instance(LogTransport *s, const gchar *name, gint fd); void log_transport_free_method(LogTransport *s); void log_transport_free(LogTransport *s); diff --git a/lib/transport/tests/Makefile.am b/lib/transport/tests/Makefile.am index 09d88a914d..545c8712cf 100644 --- a/lib/transport/tests/Makefile.am +++ b/lib/transport/tests/Makefile.am @@ -1,5 +1,6 @@ lib_transport_tests_TESTS = \ lib/transport/tests/test_aux_data \ + lib/transport/tests/test_transport \ lib/transport/tests/test_transport_stack \ lib/transport/tests/test_transport_haproxy @@ -13,6 +14,12 @@ lib_transport_tests_test_aux_data_LDADD = $(TEST_LDADD) lib_transport_tests_test_aux_data_SOURCES = \ lib/transport/tests/test_aux_data.c +lib_transport_tests_test_transport_CFLAGS = $(TEST_CFLAGS) \ + -I${top_srcdir}/lib/transport/tests +lib_transport_tests_test_transport_LDADD = $(TEST_LDADD) +lib_transport_tests_test_transport_SOURCES = \ + lib/transport/tests/test_transport.c + lib_transport_tests_test_transport_stack_CFLAGS = $(TEST_CFLAGS) \ -I${top_srcdir}/lib/transport/tests lib_transport_tests_test_transport_stack_LDADD = $(TEST_LDADD) diff --git a/lib/transport/tests/test_transport.c b/lib/transport/tests/test_transport.c new file mode 100644 index 0000000000..2ac0053dc0 --- /dev/null +++ b/lib/transport/tests/test_transport.c @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2024 Balazs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include +#include "libtest/mock-transport.h" + +#include "transport/logtransport.h" +#include "apphook.h" + +#include + +Test(transport, test_read_ahead_invokes_only_one_read_operation) +{ + /* this will result in single-byte reads */ + LogTransport *t = log_transport_mock_stream_new("readahead", -1, LTM_EOF); + + gchar buf[12] = {0}; + gboolean moved_forward; + gint rc; + + for (gint i = 1; i <= 9; i++) + { + memset(buf, 0, sizeof(buf)); + rc = log_transport_read_ahead(t, buf, 9, &moved_forward); + cr_assert(moved_forward == TRUE); + cr_assert(rc == i, "unexpected rc = %d", rc); + cr_assert(strncmp(buf, "readahead", i) == 0); + } + rc = log_transport_read_ahead(t, buf, 10, &moved_forward); + cr_assert(rc == 9, "unexpected rc = %d", rc); + cr_assert(moved_forward == FALSE); + + /* the read() returns the bytes that were read in advance */ + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read(t, buf, 9, NULL); + cr_assert(rc == 9, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "readahead"); + +} + +Test(transport, test_read_ahead_bytes_get_shifted_into_the_actual_read) +{ + LogTransport *t = log_transport_mock_records_new("readahead", -1, LTM_EOF); + + gchar buf[12] = {0}; + gboolean moved_forward; + memset(buf, 0, sizeof(buf)); + gint rc = log_transport_read_ahead(t, buf, 4, &moved_forward); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "read"); + + /* the read() returns the bytes that were read in advance */ + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read(t, buf, 4, NULL); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "read"); + +} + +Test(transport, test_read_ahead_bytes_and_new_read_is_combined) +{ + LogTransport *t = log_transport_mock_records_new("readahead", -1, LTM_EOF); + + gboolean moved_forward; + gchar buf[12] = {0}; + memset(buf, 0, sizeof(buf)); + gint rc = log_transport_read_ahead(t, buf, 4, &moved_forward); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "read"); + + /* NOTE: the mock will return only a single byte for every read to + * exercise retry mechanisms, so only read a single character here */ + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read(t, buf, 5, NULL); + cr_assert(rc == 5, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "reada"); + +} + +Test(transport, test_read_ahead_returns_the_same_buffer_any_number_of_times) +{ + LogTransport *t = log_transport_mock_records_new("readahead", -1, LTM_EOF); + gboolean moved_forward; + + gchar buf[12]; + + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 1, &moved_forward) == 1); + cr_assert_str_eq(buf, "r"); + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 2, &moved_forward) == 2); + cr_assert_str_eq(buf, "re"); + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 8, &moved_forward) == 8); + cr_assert_str_eq(buf, "readahea"); + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 4, &moved_forward) == 4); + cr_assert_str_eq(buf, "read"); + + /* the read() returns the bytes that were read in advance */ + cr_assert(log_transport_read(t, buf, 9, NULL) == 9); + cr_assert_str_eq(buf, "readahead"); + +} + +Test(transport, test_read_ahead_more_than_the_internal_buffer, .signal = SIGABRT) +{ + LogTransport *t = log_transport_mock_records_new("12345678901234567890", -1, LTM_EOF); + gboolean moved_forward; + + /* 20 bytes, the internal look ahead buffer in LogTransport is 16 bytes which we are overflowing here */ + cr_assert(sizeof(t->ra.buf) == 16); + + gchar buf[32]; + + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 20, &moved_forward) == 20); +} + +Test(transport, test_read_ahead_with_packets_split_in_half) +{ + LogTransport *t = log_transport_mock_records_new("1234", -1, "5678", -1, LTM_EOF); + gboolean moved_forward; + + gchar buf[32]; + memset(buf, 0, sizeof(buf)); + gint rc = log_transport_read_ahead(t, buf, 8, &moved_forward); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "1234"); + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read_ahead(t, buf, 8, &moved_forward); + cr_assert(rc == 8, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "12345678"); +} + +TestSuite(transport, .init = app_startup, .fini = app_shutdown); From fbd003f6ccd95f21b70c1ba5cf15cc479ef27ba9 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sat, 9 Nov 2024 07:49:31 +0100 Subject: [PATCH 03/13] transport: establish link between LogTransportStack and constituent LogTransport instances Signed-off-by: Balazs Scheidler --- lib/transport/logtransport.h | 27 ++++++++++++++++++++++++- lib/transport/transport-stack.c | 35 ++++++++++++++++++++++++++++++++- lib/transport/transport-stack.h | 3 ++- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/lib/transport/logtransport.h b/lib/transport/logtransport.h index 27f143bac0..8f898393aa 100644 --- a/lib/transport/logtransport.h +++ b/lib/transport/logtransport.h @@ -28,8 +28,25 @@ #include "syslog-ng.h" #include "transport/transport-aux-data.h" -typedef struct _LogTransport LogTransport; +/* + * LogTransport: + * + * This is an interface that a LogProto implementation can use to do I/O. + * There might be multiple LogTransport implementations alive for a specific + * connection: for instance we might do both plain text and SSL encrypted + * communication on the same socket, when the haproxy proxy protocol is in + * use and SSL is enabled. It might also make sense to instantiate a + * transport doing gzip compression transparently. + * + * The combination of interoperating LogTransport instances is called the + * LogTransportStack (see transport-stack.h) + * + * There's a circular, borrowed reference between the stack and the + * constituent LogTransport instances. + */ +typedef struct _LogTransport LogTransport; +typedef struct _LogTransportStack LogTransportStack; struct _LogTransport { gint fd; @@ -39,6 +56,7 @@ struct _LogTransport gssize (*write)(LogTransport *self, const gpointer buf, gsize count); gssize (*writev)(LogTransport *self, struct iovec *iov, gint iov_count); void (*free_fn)(LogTransport *self); + /* read ahead */ struct { @@ -46,9 +64,16 @@ struct _LogTransport gint buf_len; gint pos; } ra; + LogTransportStack *stack; const gchar *name; }; +static inline void +log_transport_assign_to_stack(LogTransport *self, LogTransportStack *stack) +{ + self->stack = stack; +} + static inline gssize log_transport_write(LogTransport *self, const gpointer buf, gsize count) { diff --git a/lib/transport/transport-stack.c b/lib/transport/transport-stack.c index 6b3ab04683..bfe1fa58bb 100644 --- a/lib/transport/transport-stack.c +++ b/lib/transport/transport-stack.c @@ -46,10 +46,11 @@ void log_transport_stack_add_transport(LogTransportStack *self, gint index, LogTransport *transport) { g_assert(self->transports[index] == NULL); + log_transport_assign_to_stack(transport, self); self->transports[index] = transport; if (self->fd == -1) self->fd = transport->fd; - else + else if (transport->fd != -1) g_assert(self->fd == transport->fd); } @@ -77,6 +78,38 @@ log_transport_stack_switch(LogTransportStack *self, gint index) return TRUE; } +/* + * Move the transport stack state to another LogTransportStack instance. + * Normally LogTransportStack instances are embedded in LogProto instances, + * so in case the LogProto instance is replaced, the transport stack may + * need to be moved. + */ +void +log_transport_stack_move(LogTransportStack *self, LogTransportStack *other) +{ + self->fd = other->fd; + self->active_transport = other->active_transport; + other->fd = -1; + + for (gint i = 0; i < LOG_TRANSPORT__MAX; i++) + { + g_assert(self->transports[i] == NULL); + g_assert(self->transport_factories[i] == NULL); + + if (other->transports[i]) + { + self->transports[i] = other->transports[i]; + log_transport_assign_to_stack(self->transports[i], self); + other->transports[i] = NULL; + } + if (other->transport_factories[i]) + { + self->transport_factories[i] = other->transport_factories[i]; + other->transport_factories[i] = NULL; + } + } +} + void log_transport_stack_init(LogTransportStack *self, LogTransport *initial_transport) { diff --git a/lib/transport/transport-stack.h b/lib/transport/transport-stack.h index a53fca18ee..9c1035027d 100644 --- a/lib/transport/transport-stack.h +++ b/lib/transport/transport-stack.h @@ -27,7 +27,6 @@ #include "transport/logtransport.h" -typedef struct _LogTransportStack LogTransportStack; typedef struct _LogTransportFactory LogTransportFactory; typedef enum @@ -117,6 +116,7 @@ log_transport_stack_get_transport(LogTransportStack *self, gint index) if (self->transport_factories[index]) { self->transports[index] = log_transport_factory_construct_transport(self->transport_factories[index], self); + log_transport_assign_to_stack(self->transports[index], self); return self->transports[index]; } return NULL; @@ -131,6 +131,7 @@ log_transport_stack_get_active(LogTransportStack *self) void log_transport_stack_add_factory(LogTransportStack *self, LogTransportFactory *); void log_transport_stack_add_transport(LogTransportStack *self, gint index, LogTransport *); gboolean log_transport_stack_switch(LogTransportStack *self, gint index); +void log_transport_stack_move(LogTransportStack *self, LogTransportStack *other); void log_transport_stack_init(LogTransportStack *self, LogTransport *initial_transport); void log_transport_stack_deinit(LogTransportStack *self); From db7c1746225b624a677fa1a0ffa62f4dd56a44fe Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sat, 2 Nov 2024 20:15:25 +0100 Subject: [PATCH 04/13] transport: remove explicit stack argument from LogTransportAdapter Signed-off-by: Balazs Scheidler --- lib/transport/tests/test_transport_haproxy.c | 3 ++- lib/transport/transport-adapter.c | 11 +++++------ lib/transport/transport-adapter.h | 3 +-- lib/transport/transport-haproxy.c | 4 ++-- lib/transport/transport-haproxy.h | 2 +- modules/afsocket/transport-mapper-inet.c | 2 +- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/transport/tests/test_transport_haproxy.c b/lib/transport/tests/test_transport_haproxy.c index 7ec90e3fa2..0af071d301 100644 --- a/lib/transport/tests/test_transport_haproxy.c +++ b/lib/transport/tests/test_transport_haproxy.c @@ -146,7 +146,8 @@ ParameterizedTest(ProtocolHeaderTestParams *params, log_transport_proxy, test_pr gssize rc; log_transport_stack_init(&stack, mock); - LogTransport *transport = log_transport_haproxy_new(&stack, LOG_TRANSPORT_INITIAL, LOG_TRANSPORT_NONE); + LogTransport *transport = log_transport_haproxy_new(LOG_TRANSPORT_INITIAL, LOG_TRANSPORT_NONE); + log_transport_assign_to_stack(transport, &stack); do { diff --git a/lib/transport/transport-adapter.c b/lib/transport/transport-adapter.c index 74f3039ac1..7b1e472e39 100644 --- a/lib/transport/transport-adapter.c +++ b/lib/transport/transport-adapter.c @@ -27,7 +27,7 @@ gssize log_transport_adapter_read_method(LogTransport *s, gpointer buf, gsize buflen, LogTransportAuxData *aux) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(self->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); return log_transport_read(transport, buf, buflen, aux); } @@ -36,7 +36,7 @@ gssize log_transport_adapter_write_method(LogTransport *s, const gpointer buf, gsize count) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(self->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); return log_transport_write(transport, buf, count); } @@ -45,20 +45,19 @@ gssize log_transport_adapter_writev_method(LogTransport *s, struct iovec *iov, gint iov_count) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(self->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); return log_transport_writev(transport, iov, iov_count); } void log_transport_adapter_init_instance(LogTransportAdapter *self, const gchar *name, - LogTransportStack *stack, LogTransportIndex base_index) + LogTransportIndex base_index) { - log_transport_init_instance(&self->super, name, stack->fd); + log_transport_init_instance(&self->super, name, -1); self->super.read = log_transport_adapter_read_method; self->super.write = log_transport_adapter_write_method; self->super.writev = log_transport_adapter_writev_method; - self->stack = stack; self->base_index = base_index; } diff --git a/lib/transport/transport-adapter.h b/lib/transport/transport-adapter.h index fbc3122100..6a1e2f0d02 100644 --- a/lib/transport/transport-adapter.h +++ b/lib/transport/transport-adapter.h @@ -30,7 +30,6 @@ typedef struct _LogTransportAdapter LogTransportAdapter; struct _LogTransportAdapter { LogTransport super; - LogTransportStack *stack; LogTransportIndex base_index; }; @@ -39,6 +38,6 @@ gssize log_transport_adapter_write_method(LogTransport *s, const gpointer buf, g gssize log_transport_adapter_writev_method(LogTransport *s, struct iovec *iov, gint iov_count); void log_transport_adapter_init_instance(LogTransportAdapter *self, const gchar *name, - LogTransportStack *stack, LogTransportIndex base); + LogTransportIndex base); #endif diff --git a/lib/transport/transport-haproxy.c b/lib/transport/transport-haproxy.c index 95421c9855..23141a0592 100644 --- a/lib/transport/transport-haproxy.c +++ b/lib/transport/transport-haproxy.c @@ -585,11 +585,11 @@ _haproxy_read(LogTransport *s, gpointer buf, gsize buflen, LogTransportAuxData * } LogTransport * -log_transport_haproxy_new(LogTransportStack *stack, LogTransportIndex base, LogTransportIndex switch_to) +log_transport_haproxy_new(LogTransportIndex base, LogTransportIndex switch_to) { LogTransportHAProxy *self = g_new0(LogTransportHAProxy, 1); - log_transport_adapter_init_instance(&self->super, "haproxy", stack, base); + log_transport_adapter_init_instance(&self->super, "haproxy", base); self->super.super.read = _haproxy_read; self->switch_to = switch_to; diff --git a/lib/transport/transport-haproxy.h b/lib/transport/transport-haproxy.h index 6830df8f80..c689934b7a 100644 --- a/lib/transport/transport-haproxy.h +++ b/lib/transport/transport-haproxy.h @@ -26,6 +26,6 @@ #include "transport-adapter.h" -LogTransport *log_transport_haproxy_new(LogTransportStack *stack, LogTransportIndex base, LogTransportIndex flip); +LogTransport *log_transport_haproxy_new(LogTransportIndex base, LogTransportIndex flip); #endif diff --git a/modules/afsocket/transport-mapper-inet.c b/modules/afsocket/transport-mapper-inet.c index 4552e66084..2982e85ce0 100644 --- a/modules/afsocket/transport-mapper-inet.c +++ b/modules/afsocket/transport-mapper-inet.c @@ -108,7 +108,7 @@ _setup_haproxy_transport(TransportMapperInet *self, LogTransportStack *stack, LogTransportIndex base_index, LogTransportIndex switch_to) { log_transport_stack_add_transport(stack, LOG_TRANSPORT_HAPROXY, - log_transport_haproxy_new(stack, base_index, switch_to)); + log_transport_haproxy_new(base_index, switch_to)); return TRUE; } From 0637d121e1d1e09d2e6fff918fede582498f3226 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 10 Nov 2024 20:54:30 +0100 Subject: [PATCH 05/13] transport-stack: add assertion to log_transport_stack_get_active() Signed-off-by: Balazs Scheidler --- lib/transport/transport-stack.c | 1 + lib/transport/transport-stack.h | 2 ++ 2 files changed, 3 insertions(+) diff --git a/lib/transport/transport-stack.c b/lib/transport/transport-stack.c index bfe1fa58bb..7e75f7a621 100644 --- a/lib/transport/transport-stack.c +++ b/lib/transport/transport-stack.c @@ -57,6 +57,7 @@ log_transport_stack_add_transport(LogTransportStack *self, gint index, LogTransp gboolean log_transport_stack_switch(LogTransportStack *self, gint index) { + g_assert(index < LOG_TRANSPORT__MAX); LogTransport *active_transport = log_transport_stack_get_active(self); LogTransport *requested_transport = log_transport_stack_get_transport(self, index); diff --git a/lib/transport/transport-stack.h b/lib/transport/transport-stack.h index 9c1035027d..a819ae065b 100644 --- a/lib/transport/transport-stack.h +++ b/lib/transport/transport-stack.h @@ -110,6 +110,8 @@ struct _LogTransportStack static inline LogTransport * log_transport_stack_get_transport(LogTransportStack *self, gint index) { + g_assert(index < LOG_TRANSPORT__MAX); + if (self->transports[index]) return self->transports[index]; From 7a3a4a64217c8c0e4ddd57b20734d0a734eb3779 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Fri, 8 Nov 2024 21:05:20 +0100 Subject: [PATCH 06/13] transport-stack: indicate failure in log_transport_stack_switch() Signed-off-by: Balazs Scheidler --- lib/transport/transport-stack.c | 3 ++- modules/afsocket/transport-mapper-inet.c | 3 ++- modules/afsocket/transport-mapper-unix.c | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/transport/transport-stack.c b/lib/transport/transport-stack.c index 7e75f7a621..2c505b1916 100644 --- a/lib/transport/transport-stack.c +++ b/lib/transport/transport-stack.c @@ -61,7 +61,8 @@ log_transport_stack_switch(LogTransportStack *self, gint index) LogTransport *active_transport = log_transport_stack_get_active(self); LogTransport *requested_transport = log_transport_stack_get_transport(self, index); - g_assert(requested_transport != NULL); + if (!requested_transport) + return FALSE; msg_debug("Transport switch requested", evt_tag_str("active-transport", active_transport ? active_transport->name : "none"), diff --git a/modules/afsocket/transport-mapper-inet.c b/modules/afsocket/transport-mapper-inet.c index 2982e85ce0..cc05b69a86 100644 --- a/modules/afsocket/transport-mapper-inet.c +++ b/modules/afsocket/transport-mapper-inet.c @@ -143,7 +143,8 @@ transport_mapper_inet_setup_stack(TransportMapper *s, LogTransportStack *stack) initial_transport_index = LOG_TRANSPORT_HAPROXY; } - log_transport_stack_switch(stack, initial_transport_index); + if (!log_transport_stack_switch(stack, initial_transport_index)) + g_assert_not_reached(); return TRUE; } diff --git a/modules/afsocket/transport-mapper-unix.c b/modules/afsocket/transport-mapper-unix.c index 13c1ee86a9..16fb86409a 100644 --- a/modules/afsocket/transport-mapper-unix.c +++ b/modules/afsocket/transport-mapper-unix.c @@ -43,7 +43,8 @@ _setup_stack(TransportMapper *s, LogTransportStack *stack) else transport = log_transport_unix_stream_socket_new(stack->fd); log_transport_stack_add_transport(stack, LOG_TRANSPORT_SOCKET, transport); - log_transport_stack_switch(stack, LOG_TRANSPORT_SOCKET); + if (!log_transport_stack_switch(stack, LOG_TRANSPORT_SOCKET)) + g_assert_not_reached(); return TRUE; } From 95285f19f99ea4a4c2c64b9d7aac8ff6fa8fab20 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sat, 3 Feb 2024 15:48:17 +0100 Subject: [PATCH 07/13] logproto: add LogProtoAutoServer implementation Signed-off-by: Balazs Scheidler --- lib/logproto/CMakeLists.txt | 2 + lib/logproto/Makefile.am | 2 + lib/logproto/logproto-auto-server.c | 147 ++++++++++++++++++++++++++ lib/logproto/logproto-auto-server.h | 30 ++++++ lib/logproto/tests/CMakeLists.txt | 1 + lib/logproto/tests/Makefile.am | 1 + lib/logproto/tests/test-auto-server.c | 139 ++++++++++++++++++++++++ libtest/proto_lib.c | 38 +++++++ libtest/proto_lib.h | 3 + 9 files changed, 363 insertions(+) create mode 100644 lib/logproto/logproto-auto-server.c create mode 100644 lib/logproto/logproto-auto-server.h create mode 100644 lib/logproto/tests/test-auto-server.c diff --git a/lib/logproto/CMakeLists.txt b/lib/logproto/CMakeLists.txt index 7348b63824..7c9de7dcd4 100644 --- a/lib/logproto/CMakeLists.txt +++ b/lib/logproto/CMakeLists.txt @@ -11,6 +11,7 @@ set(LOGPROTO_HEADERS logproto/logproto-server.h logproto/logproto-text-client.h logproto/logproto-text-server.h + logproto/logproto-auto-server.h PARENT_SCOPE) set(LOGPROTO_SOURCES @@ -25,6 +26,7 @@ set(LOGPROTO_SOURCES logproto/logproto-server.c logproto/logproto-text-client.c logproto/logproto-text-server.c + logproto/logproto-auto-server.c PARENT_SCOPE) add_test_subdirectory(tests) diff --git a/lib/logproto/Makefile.am b/lib/logproto/Makefile.am index da02d8f723..0fa2b1839c 100644 --- a/lib/logproto/Makefile.am +++ b/lib/logproto/Makefile.am @@ -11,6 +11,7 @@ logprotoinclude_HEADERS = \ lib/logproto/logproto-framed-server.h \ lib/logproto/logproto-text-client.h \ lib/logproto/logproto-text-server.h \ + lib/logproto/logproto-auto-server.h \ lib/logproto/logproto-multiline-server.h \ lib/logproto/logproto-record-server.h \ lib/logproto/logproto-builtins.h \ @@ -25,6 +26,7 @@ logproto_sources = \ lib/logproto/logproto-framed-server.c \ lib/logproto/logproto-text-client.c \ lib/logproto/logproto-text-server.c \ + lib/logproto/logproto-auto-server.c \ lib/logproto/logproto-multiline-server.c \ lib/logproto/logproto-record-server.c \ lib/logproto/logproto-builtins.c diff --git a/lib/logproto/logproto-auto-server.c b/lib/logproto/logproto-auto-server.c new file mode 100644 index 0000000000..a272c038d5 --- /dev/null +++ b/lib/logproto/logproto-auto-server.c @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2024 Balázs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ +#include "logproto-auto-server.h" +#include "logproto-text-server.h" +#include "logproto-framed-server.h" +#include "messages.h" + +typedef struct _LogProtoAutoServer +{ + LogProtoServer super; + + /* the actual LogProto instance that we run after auto-detecting the protocol */ + LogProtoServer *proto_impl; + LogTransport *transport; +} LogProtoAutoServer; + +static LogProtoServer * +_construct_detected_proto(LogProtoAutoServer *self, const gchar *detect_buffer, gsize detect_buffer_len) +{ + LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); + + if (g_ascii_isdigit(detect_buffer[0])) + { + msg_debug("Auto-detected octet-counted-framing on RFC6587 connection, using framed protocol", + evt_tag_int("fd", transport->fd)); + return log_proto_framed_server_new(NULL, self->super.options); + } + if (detect_buffer[0] == '<') + { + msg_debug("Auto-detected non-transparent-framing on RFC6587 connection, using simple text protocol", + evt_tag_int("fd", transport->fd)); + } + else + { + msg_debug("Unable to detect framing on RFC6587 connection, falling back to simple text transport", + evt_tag_int("fd", transport->fd), + evt_tag_mem("detect_buffer", detect_buffer, detect_buffer_len)); + } + return log_proto_text_server_new(NULL, self->super.options); +} + +static LogProtoPrepareAction +log_proto_auto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED) +{ + LogProtoAutoServer *self = (LogProtoAutoServer *) s; + LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); + + if (self->proto_impl) + return log_proto_server_prepare(self->proto_impl, cond, timeout); + + *cond = transport->cond; + if (*cond == 0) + *cond = G_IO_IN; + + return LPPA_POLL_IO; +} + +static LogProtoStatus +log_proto_auto_server_fetch(LogProtoServer *s, const guchar **msg, gsize *msg_len, gboolean *may_read, + LogTransportAuxData *aux, Bookmark *bookmark) +{ + LogProtoAutoServer *self = (LogProtoAutoServer *) s; + + if (self->proto_impl) + return log_proto_server_fetch(self->proto_impl, msg, msg_len, may_read, aux, bookmark); + + g_assert_not_reached(); +} + +static LogProtoStatus +log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished) +{ + LogProtoAutoServer *self = (LogProtoAutoServer *) s; + LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); + /* allow the impl to do its handshake */ + if (self->proto_impl) + return log_proto_server_handshake(self->proto_impl, handshake_finished); + + gchar detect_buffer[8]; + gboolean moved_forward; + gint rc; + + rc = log_transport_read_ahead(transport, detect_buffer, sizeof(detect_buffer), &moved_forward); + if (rc == 0) + return LPS_EOF; + else if (rc < 0) + { + if (errno == EAGAIN) + return LPS_AGAIN; + return LPS_ERROR; + } + + self->proto_impl = _construct_detected_proto(self, detect_buffer, rc); + if (self->proto_impl) + { + /* transport is handed over to the new proto */ + log_transport_stack_move(&self->proto_impl->transport_stack, &self->super.transport_stack); + return LPS_SUCCESS; + } + return LPS_ERROR; +} + +static void +log_proto_auto_server_free(LogProtoServer *s) +{ + LogProtoAutoServer *self = (LogProtoAutoServer *) s; + + if (self->proto_impl) + log_proto_server_free(self->proto_impl); + log_proto_server_free_method(s); +} + +LogProtoServer * +log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *options) +{ + LogProtoAutoServer *self = g_new0(LogProtoAutoServer, 1); + + /* we are not using our own transport stack, transport is to be passed to + * the LogProto implementation once we finished with detection */ + + log_proto_server_init(&self->super, transport, options); + self->super.handshake = log_proto_auto_handshake; + self->super.prepare = log_proto_auto_server_prepare; + self->super.fetch = log_proto_auto_server_fetch; + self->super.free_fn = log_proto_auto_server_free; + return &self->super; +} diff --git a/lib/logproto/logproto-auto-server.h b/lib/logproto/logproto-auto-server.h new file mode 100644 index 0000000000..0872cd66e8 --- /dev/null +++ b/lib/logproto/logproto-auto-server.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 Balázs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ +#ifndef LOGPROTO_AUTO_SERVER_H_INCLUDED +#define LOGPROTO_AUTO_SERVER_H_INCLUDED + +#include "logproto-server.h" + +LogProtoServer *log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *options); + +#endif diff --git a/lib/logproto/tests/CMakeLists.txt b/lib/logproto/tests/CMakeLists.txt index c94913854a..018cd41be6 100644 --- a/lib/logproto/tests/CMakeLists.txt +++ b/lib/logproto/tests/CMakeLists.txt @@ -5,6 +5,7 @@ set(TEST_LOGPROTO_SOURCES test-text-server.c test-dgram-server.c test-framed-server.c + test-auto-server.c test-indented-multiline-server.c test-regexp-multiline-server.c) diff --git a/lib/logproto/tests/Makefile.am b/lib/logproto/tests/Makefile.am index 8dcb10a91d..b334a7094c 100644 --- a/lib/logproto/tests/Makefile.am +++ b/lib/logproto/tests/Makefile.am @@ -17,6 +17,7 @@ lib_logproto_tests_test_logproto_SOURCES = \ lib/logproto/tests/test-text-server.c \ lib/logproto/tests/test-dgram-server.c \ lib/logproto/tests/test-framed-server.c \ + lib/logproto/tests/test-auto-server.c \ lib/logproto/tests/test-indented-multiline-server.c \ lib/logproto/tests/test-regexp-multiline-server.c diff --git a/lib/logproto/tests/test-auto-server.c b/lib/logproto/tests/test-auto-server.c new file mode 100644 index 0000000000..47ec821587 --- /dev/null +++ b/lib/logproto/tests/test-auto-server.c @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2012-2019 Balabit + * Copyright (c) 2012-2013 Balázs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include +#include "libtest/mock-transport.h" +#include "libtest/proto_lib.h" +#include "libtest/msg_parse_lib.h" + +#include "logproto/logproto-auto-server.h" + +#include + +Test(log_proto, test_log_proto_initial_framing_too_long) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "100000000 too long\n", -1, + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake_failure(proto, LPS_SUCCESS); + assert_proto_server_fetch_failure(proto, LPS_ERROR, NULL); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_error_in_initial_frame) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + LTM_INJECT_ERROR(EIO)), + get_inited_proto_server_options()); + + assert_proto_server_handshake_failure(proto, LPS_ERROR); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_auto_server_no_framing) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "abcdefghijklmnopqstuvwxyz\n", -1, + "01234567\n", -1, + "01234567\0", 9, + "abcdef", -1, + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake(proto); + assert_proto_server_fetch(proto, "abcdefghijklmnopqstuvwxyz", -1); + assert_proto_server_fetch(proto, "01234567", -1); + assert_proto_server_fetch(proto, "01234567", 8); + assert_proto_server_fetch(proto, "abcdef", -1); + assert_proto_server_fetch_failure(proto, LPS_EOF, NULL); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_auto_server_opening_bracket) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "<55> abcdefghijklmnopqstuvwxyz\n", -1, + "01234567\n", -1, + "01234567\0", 9, + "abcdef", -1, + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake(proto); + assert_proto_server_fetch(proto, "<55> abcdefghijklmnopqstuvwxyz", -1); + assert_proto_server_fetch(proto, "01234567", -1); + assert_proto_server_fetch(proto, "01234567", 8); + assert_proto_server_fetch(proto, "abcdef", -1); + assert_proto_server_fetch_failure(proto, LPS_EOF, NULL); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_auto_server_with_framing) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "32 0123456789ABCDEF0123456789ABCDEF", -1, + "10 01234567\n\n", -1, + "10 01234567\0\0", 13, + /* utf8 */ + "30 árvíztűrőtükörfúrógép", -1, + /* iso-8859-2 */ + "21 \xe1\x72\x76\xed\x7a\x74\xfb\x72\xf5\x74\xfc\x6b\xf6\x72\x66\xfa" /* |árvíztűrőtükörfú| */ + "\x72\xf3\x67\xe9\x70", -1, /* |rógép| */ + /* ucs4 */ + "32 \x00\x00\x00\xe1\x00\x00\x00\x72\x00\x00\x00\x76\x00\x00\x00\xed" /* |...á...r...v...í| */ + "\x00\x00\x00\x7a\x00\x00\x00\x74\x00\x00\x01\x71\x00\x00\x00\x72", 35, /* |...z...t...ű...r| */ + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake(proto); + assert_proto_server_fetch(proto, "0123456789ABCDEF0123456789ABCDEF", -1); + assert_proto_server_fetch(proto, "01234567\n\n", -1); + assert_proto_server_fetch(proto, "01234567\0\0", 10); + assert_proto_server_fetch(proto, "árvíztűrőtükörfúrógép", -1); + assert_proto_server_fetch(proto, + "\xe1\x72\x76\xed\x7a\x74\xfb\x72\xf5\x74\xfc\x6b\xf6\x72\x66\xfa" /* |.rv.zt.r.t.k.rf.| */ + "\x72\xf3\x67\xe9\x70", -1); /* |r.g.p| */ + assert_proto_server_fetch(proto, + "\x00\x00\x00\xe1\x00\x00\x00\x72\x00\x00\x00\x76\x00\x00\x00\xed" /* |...á...r...v...í| */ + "\x00\x00\x00\x7a\x00\x00\x00\x74\x00\x00\x01\x71\x00\x00\x00\x72", 32); /* |...z...t...q...r| */ + assert_proto_server_fetch_failure(proto, LPS_EOF, NULL); + log_proto_server_free(proto); +} diff --git a/libtest/proto_lib.c b/libtest/proto_lib.c index ecbdf22fc8..b5d92303ee 100644 --- a/libtest/proto_lib.c +++ b/libtest/proto_lib.c @@ -37,6 +37,24 @@ assert_proto_server_status(LogProtoServer *proto, LogProtoStatus status, LogProt cr_assert_eq(status, expected_status, "LogProtoServer expected status mismatch"); } +LogProtoStatus +proto_server_handshake(LogProtoServer *proto) +{ + gboolean handshake_finished = FALSE; + LogProtoStatus status; + + start_grabbing_messages(); + do + { + status = log_proto_server_handshake(proto, &handshake_finished); + if (status == LPS_AGAIN) + status = LPS_SUCCESS; + } + while (status == LPS_SUCCESS && handshake_finished == FALSE); + stop_grabbing_messages(); + return status; +} + LogProtoStatus proto_server_fetch(LogProtoServer *proto, const guchar **msg, gsize *msg_len) { @@ -80,6 +98,16 @@ construct_server_proto_plugin(const gchar *name, LogTransport *transport) return log_proto_server_factory_construct(proto_factory, transport, &proto_server_options); } +void +assert_proto_server_handshake(LogProtoServer *proto) +{ + LogProtoStatus status; + + status = proto_server_handshake(proto); + + assert_proto_server_status(proto, status, LPS_SUCCESS); +} + void assert_proto_server_fetch(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len) { @@ -146,6 +174,16 @@ assert_proto_server_fetch_failure(LogProtoServer *proto, LogProtoStatus expected assert_grabbed_log_contains(error_message); } +void +assert_proto_server_handshake_failure(LogProtoServer *proto, LogProtoStatus expected_status) +{ + LogProtoStatus status; + + status = proto_server_handshake(proto); + + assert_proto_server_status(proto, status, expected_status); +} + void assert_proto_server_fetch_ignored_eof(LogProtoServer *proto) { diff --git a/libtest/proto_lib.h b/libtest/proto_lib.h index 357a7d2fe8..ee87a3c9e4 100644 --- a/libtest/proto_lib.h +++ b/libtest/proto_lib.h @@ -29,6 +29,9 @@ extern LogProtoServerOptions proto_server_options; + +void assert_proto_server_handshake(LogProtoServer *proto); +void assert_proto_server_handshake_failure(LogProtoServer *proto, LogProtoStatus expected_status); void assert_proto_server_status(LogProtoServer *proto, LogProtoStatus status, LogProtoStatus expected_status); void assert_proto_server_fetch(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len); void assert_proto_server_fetch_single_read(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len); From a68c3af0a32676c2076f0aaa495589a80f478982 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 25 Apr 2024 21:11:35 +0200 Subject: [PATCH 08/13] syslog: use rfc6587 style auto-detection when transport("auto") is selected Signed-off-by: Balazs Scheidler --- lib/logproto/logproto-builtins.c | 3 +++ modules/afsocket/afsocket-grammar.ym | 1 + 2 files changed, 4 insertions(+) diff --git a/lib/logproto/logproto-builtins.c b/lib/logproto/logproto-builtins.c index d21eb1d1d1..e76b23769e 100644 --- a/lib/logproto/logproto-builtins.c +++ b/lib/logproto/logproto-builtins.c @@ -26,6 +26,7 @@ #include "logproto-text-server.h" #include "logproto-framed-client.h" #include "logproto-framed-server.h" +#include "logproto-auto-server.h" #include "plugin.h" #include "plugin-types.h" @@ -39,6 +40,7 @@ DEFINE_LOG_PROTO_SERVER(log_proto_text); DEFINE_LOG_PROTO_SERVER(log_proto_text_with_nuls); DEFINE_LOG_PROTO_CLIENT(log_proto_framed); DEFINE_LOG_PROTO_SERVER(log_proto_framed); +DEFINE_LOG_PROTO_SERVER(log_proto_auto); static Plugin framed_server_plugins[] = { @@ -50,6 +52,7 @@ static Plugin framed_server_plugins[] = LOG_PROTO_SERVER_PLUGIN(log_proto_text_with_nuls, "text-with-nuls"), LOG_PROTO_CLIENT_PLUGIN(log_proto_framed, "framed"), LOG_PROTO_SERVER_PLUGIN(log_proto_framed, "framed"), + LOG_PROTO_SERVER_PLUGIN(log_proto_auto, "auto"), }; void diff --git a/modules/afsocket/afsocket-grammar.ym b/modules/afsocket/afsocket-grammar.ym index d26ea8bd73..fefe3d845a 100644 --- a/modules/afsocket/afsocket-grammar.ym +++ b/modules/afsocket/afsocket-grammar.ym @@ -772,6 +772,7 @@ afsocket_transport : KW_TRANSPORT '(' KW_TCP ')' { transport_mapper_set_transport(last_transport_mapper, "tcp"); } | KW_TRANSPORT '(' KW_UDP ')' { transport_mapper_set_transport(last_transport_mapper, "udp"); } | KW_TRANSPORT '(' KW_TLS ')' { transport_mapper_set_transport(last_transport_mapper, "tls"); } + | KW_TRANSPORT '(' KW_AUTO ')' { transport_mapper_set_transport(last_transport_mapper, "auto"); } | KW_IP_PROTOCOL '(' inet_ip_protocol_option ')' { transport_mapper_set_address_family(last_transport_mapper, $3); } ; From b0b3a4910adc15a02e497b41a50b212af79ca443 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 20 Oct 2024 16:54:40 +0200 Subject: [PATCH 09/13] libtest: allow LogProtoServer instance to change during handshake This is just the change in test code, which adds LogProtoServer ** arguments to the handshake related functions, in order to allow log_proto_server_handshake to change the LogProto instance. Signed-off-by: Balazs Scheidler --- lib/logproto/tests/test-auto-server.c | 10 +++++----- libtest/proto_lib.c | 12 ++++++------ libtest/proto_lib.h | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/logproto/tests/test-auto-server.c b/lib/logproto/tests/test-auto-server.c index 47ec821587..c697d5495f 100644 --- a/lib/logproto/tests/test-auto-server.c +++ b/lib/logproto/tests/test-auto-server.c @@ -41,7 +41,7 @@ Test(log_proto, test_log_proto_initial_framing_too_long) LTM_EOF), get_inited_proto_server_options()); - assert_proto_server_handshake_failure(proto, LPS_SUCCESS); + assert_proto_server_handshake_failure(&proto, LPS_SUCCESS); assert_proto_server_fetch_failure(proto, LPS_ERROR, NULL); log_proto_server_free(proto); } @@ -55,7 +55,7 @@ Test(log_proto, test_log_proto_error_in_initial_frame) LTM_INJECT_ERROR(EIO)), get_inited_proto_server_options()); - assert_proto_server_handshake_failure(proto, LPS_ERROR); + assert_proto_server_handshake_failure(&proto, LPS_ERROR); log_proto_server_free(proto); } @@ -72,7 +72,7 @@ Test(log_proto, test_log_proto_auto_server_no_framing) LTM_EOF), get_inited_proto_server_options()); - assert_proto_server_handshake(proto); + assert_proto_server_handshake(&proto); assert_proto_server_fetch(proto, "abcdefghijklmnopqstuvwxyz", -1); assert_proto_server_fetch(proto, "01234567", -1); assert_proto_server_fetch(proto, "01234567", 8); @@ -94,7 +94,7 @@ Test(log_proto, test_log_proto_auto_server_opening_bracket) LTM_EOF), get_inited_proto_server_options()); - assert_proto_server_handshake(proto); + assert_proto_server_handshake(&proto); assert_proto_server_fetch(proto, "<55> abcdefghijklmnopqstuvwxyz", -1); assert_proto_server_fetch(proto, "01234567", -1); assert_proto_server_fetch(proto, "01234567", 8); @@ -123,7 +123,7 @@ Test(log_proto, test_log_proto_auto_server_with_framing) LTM_EOF), get_inited_proto_server_options()); - assert_proto_server_handshake(proto); + assert_proto_server_handshake(&proto); assert_proto_server_fetch(proto, "0123456789ABCDEF0123456789ABCDEF", -1); assert_proto_server_fetch(proto, "01234567\n\n", -1); assert_proto_server_fetch(proto, "01234567\0\0", 10); diff --git a/libtest/proto_lib.c b/libtest/proto_lib.c index b5d92303ee..ca49658197 100644 --- a/libtest/proto_lib.c +++ b/libtest/proto_lib.c @@ -38,7 +38,7 @@ assert_proto_server_status(LogProtoServer *proto, LogProtoStatus status, LogProt } LogProtoStatus -proto_server_handshake(LogProtoServer *proto) +proto_server_handshake(LogProtoServer **proto) { gboolean handshake_finished = FALSE; LogProtoStatus status; @@ -46,7 +46,7 @@ proto_server_handshake(LogProtoServer *proto) start_grabbing_messages(); do { - status = log_proto_server_handshake(proto, &handshake_finished); + status = log_proto_server_handshake(*proto, &handshake_finished); if (status == LPS_AGAIN) status = LPS_SUCCESS; } @@ -99,13 +99,13 @@ construct_server_proto_plugin(const gchar *name, LogTransport *transport) } void -assert_proto_server_handshake(LogProtoServer *proto) +assert_proto_server_handshake(LogProtoServer **proto) { LogProtoStatus status; status = proto_server_handshake(proto); - assert_proto_server_status(proto, status, LPS_SUCCESS); + assert_proto_server_status(*proto, status, LPS_SUCCESS); } void @@ -175,13 +175,13 @@ assert_proto_server_fetch_failure(LogProtoServer *proto, LogProtoStatus expected } void -assert_proto_server_handshake_failure(LogProtoServer *proto, LogProtoStatus expected_status) +assert_proto_server_handshake_failure(LogProtoServer **proto, LogProtoStatus expected_status) { LogProtoStatus status; status = proto_server_handshake(proto); - assert_proto_server_status(proto, status, expected_status); + assert_proto_server_status(*proto, status, expected_status); } void diff --git a/libtest/proto_lib.h b/libtest/proto_lib.h index ee87a3c9e4..5b75874e38 100644 --- a/libtest/proto_lib.h +++ b/libtest/proto_lib.h @@ -30,8 +30,8 @@ extern LogProtoServerOptions proto_server_options; -void assert_proto_server_handshake(LogProtoServer *proto); -void assert_proto_server_handshake_failure(LogProtoServer *proto, LogProtoStatus expected_status); +void assert_proto_server_handshake(LogProtoServer **proto); +void assert_proto_server_handshake_failure(LogProtoServer **proto, LogProtoStatus expected_status); void assert_proto_server_status(LogProtoServer *proto, LogProtoStatus status, LogProtoStatus expected_status); void assert_proto_server_fetch(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len); void assert_proto_server_fetch_single_read(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len); From ba0dac70677db500fc77c1d6e38f04a3b322dfef Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 20 Oct 2024 17:57:24 +0200 Subject: [PATCH 10/13] logreader: allow replacing LogProtoServer instances during handshake This can be used to simplify LogProtoAutoServer. Signed-off-by: Balazs Scheidler --- lib/logproto/logproto-auto-server.c | 47 ++--------------------------- lib/logproto/logproto-server.h | 14 +++++++-- lib/logreader.c | 11 ++++++- libtest/proto_lib.c | 9 +++++- 4 files changed, 32 insertions(+), 49 deletions(-) diff --git a/lib/logproto/logproto-auto-server.c b/lib/logproto/logproto-auto-server.c index a272c038d5..d35ad5c15f 100644 --- a/lib/logproto/logproto-auto-server.c +++ b/lib/logproto/logproto-auto-server.c @@ -28,10 +28,6 @@ typedef struct _LogProtoAutoServer { LogProtoServer super; - - /* the actual LogProto instance that we run after auto-detecting the protocol */ - LogProtoServer *proto_impl; - LogTransport *transport; } LogProtoAutoServer; static LogProtoServer * @@ -65,9 +61,6 @@ log_proto_auto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeo LogProtoAutoServer *self = (LogProtoAutoServer *) s; LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); - if (self->proto_impl) - return log_proto_server_prepare(self->proto_impl, cond, timeout); - *cond = transport->cond; if (*cond == 0) *cond = G_IO_IN; @@ -76,26 +69,10 @@ log_proto_auto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeo } static LogProtoStatus -log_proto_auto_server_fetch(LogProtoServer *s, const guchar **msg, gsize *msg_len, gboolean *may_read, - LogTransportAuxData *aux, Bookmark *bookmark) -{ - LogProtoAutoServer *self = (LogProtoAutoServer *) s; - - if (self->proto_impl) - return log_proto_server_fetch(self->proto_impl, msg, msg_len, may_read, aux, bookmark); - - g_assert_not_reached(); -} - -static LogProtoStatus -log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished) +log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement) { LogProtoAutoServer *self = (LogProtoAutoServer *) s; LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); - /* allow the impl to do its handshake */ - if (self->proto_impl) - return log_proto_server_handshake(self->proto_impl, handshake_finished); - gchar detect_buffer[8]; gboolean moved_forward; gint rc; @@ -110,24 +87,8 @@ log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished) return LPS_ERROR; } - self->proto_impl = _construct_detected_proto(self, detect_buffer, rc); - if (self->proto_impl) - { - /* transport is handed over to the new proto */ - log_transport_stack_move(&self->proto_impl->transport_stack, &self->super.transport_stack); - return LPS_SUCCESS; - } - return LPS_ERROR; -} - -static void -log_proto_auto_server_free(LogProtoServer *s) -{ - LogProtoAutoServer *self = (LogProtoAutoServer *) s; - - if (self->proto_impl) - log_proto_server_free(self->proto_impl); - log_proto_server_free_method(s); + *proto_replacement = _construct_detected_proto(self, detect_buffer, rc); + return LPS_SUCCESS; } LogProtoServer * @@ -141,7 +102,5 @@ log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions * log_proto_server_init(&self->super, transport, options); self->super.handshake = log_proto_auto_handshake; self->super.prepare = log_proto_auto_server_prepare; - self->super.fetch = log_proto_auto_server_fetch; - self->super.free_fn = log_proto_auto_server_free; return &self->super; } diff --git a/lib/logproto/logproto-server.h b/lib/logproto/logproto-server.h index 76904143cc..5c7d123453 100644 --- a/lib/logproto/logproto-server.h +++ b/lib/logproto/logproto-server.h @@ -89,7 +89,7 @@ struct _LogProtoServer LogProtoStatus (*fetch)(LogProtoServer *s, const guchar **msg, gsize *msg_len, gboolean *may_read, LogTransportAuxData *aux, Bookmark *bookmark); gboolean (*validate_options)(LogProtoServer *s); - LogProtoStatus (*handshake)(LogProtoServer *s, gboolean *handshake_finished); + LogProtoStatus (*handshake)(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement); void (*free_fn)(LogProtoServer *s); }; @@ -100,11 +100,19 @@ log_proto_server_validate_options(LogProtoServer *self) } static inline LogProtoStatus -log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished) +log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement) { if (s->handshake) { - return s->handshake(s, handshake_finished); + LogProtoStatus status; + + g_assert(*proto_replacement == NULL); + status = s->handshake(s, handshake_finished, proto_replacement); + if (*proto_replacement) + { + g_assert(status == LPS_SUCCESS || status == LPS_AGAIN); + } + return status; } *handshake_finished = TRUE; return LPS_SUCCESS; diff --git a/lib/logreader.c b/lib/logreader.c index 0001912b15..f59a28c075 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -426,7 +426,16 @@ static inline gint log_reader_process_handshake(LogReader *self) { gboolean handshake_finished = FALSE; - LogProtoStatus status = log_proto_server_handshake(self->proto, &handshake_finished); + LogProtoServer *proto_replacement = NULL; + LogProtoStatus status = log_proto_server_handshake(self->proto, &handshake_finished, &proto_replacement); + + if (proto_replacement) + { + g_assert(handshake_finished == FALSE); + log_transport_stack_move(&proto_replacement->transport_stack, &self->proto->transport_stack); + log_proto_server_free(self->proto); + self->proto = proto_replacement; + } switch (status) { diff --git a/libtest/proto_lib.c b/libtest/proto_lib.c index ca49658197..3e796123c8 100644 --- a/libtest/proto_lib.c +++ b/libtest/proto_lib.c @@ -46,9 +46,16 @@ proto_server_handshake(LogProtoServer **proto) start_grabbing_messages(); do { - status = log_proto_server_handshake(*proto, &handshake_finished); + LogProtoServer *proto_replacement = NULL; + status = log_proto_server_handshake(*proto, &handshake_finished, &proto_replacement); if (status == LPS_AGAIN) status = LPS_SUCCESS; + if (proto_replacement) + { + log_transport_stack_move(&proto_replacement->transport_stack, &(*proto)->transport_stack); + log_proto_server_free(*proto); + *proto = proto_replacement; + } } while (status == LPS_SUCCESS && handshake_finished == FALSE); stop_grabbing_messages(); From 0c47635f772bda8523043f4a3ad261cd09611fa6 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 20 Oct 2024 18:39:32 +0200 Subject: [PATCH 11/13] light: add testcase for framing auto detection Signed-off-by: Balazs Scheidler --- .../syslog_source/auto/test_auto_proto.py | 73 +++++++++++++++++++ .../statements/sources/syslog_source.py | 1 + 2 files changed, 74 insertions(+) create mode 100644 tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto.py diff --git a/tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto.py b/tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto.py new file mode 100644 index 0000000000..3ea20e13cc --- /dev/null +++ b/tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +############################################################################# +# Copyright (c) 2024 Balazs Scheidler +# Copyright (c) 2024 Axoflow +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 as published +# by the Free Software Foundation, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +# As an additional exemption you are allowed to compile & link against the +# OpenSSL libraries as published by the OpenSSL project. See the file +# COPYING for details. +# +############################################################################# +from pathlib import Path + +from src.common.blocking import wait_until_true +from src.common.file import File +from src.common.random_id import get_unique_id + + +def _test_auto_detect(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, transport, input_messages, number_of_messages, expected_messages): + output_file = "output.log" + + syslog_source = config.create_syslog_source( + ip="localhost", + port=port_allocator(), + keep_hostname="yes", + transport=transport, + ) + file_destination = config.create_file_destination(file_name=output_file) + config.create_logpath(statements=[syslog_source, file_destination]) + + syslog_ng.start(config) + + loggen_input_file_path = Path("loggen_input_{}.txt".format(get_unique_id())) + loggen_input_file = File(loggen_input_file_path) + loggen_input_file.write_content_and_close(input_messages) + loggen.start( + syslog_source.options["ip"], syslog_source.options["port"], + number=number_of_messages, + dont_parse=True, + read_file=str(loggen_input_file_path), + syslog_proto=True, + inet=True, + ) + + wait_until_true(lambda: loggen.get_sent_message_count() == number_of_messages) + + assert file_destination.read_log() == expected_messages + + +def test_auto_framing(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): + INPUT_MESSAGES = "53 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" * 10 + EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + NUMBER_OF_MESSAGES = 10 + _test_auto_detect(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"auto"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES) + + +def test_auto_no_framing(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): + INPUT_MESSAGES = "<2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" * 10 + EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + NUMBER_OF_MESSAGES = 10 + _test_auto_detect(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"auto"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES) diff --git a/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py b/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py index 20a43d9e1e..be6faf33d5 100644 --- a/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py +++ b/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py @@ -26,6 +26,7 @@ def map_transport(transport): mapping = { + "auto": NetworkIO.Transport.TCP, "tcp": NetworkIO.Transport.TCP, "udp": NetworkIO.Transport.UDP, "tls": NetworkIO.Transport.TLS, From 61f4af09a63ae21e58c24389ef14507a2ec2c522 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sat, 3 Feb 2024 16:40:23 +0100 Subject: [PATCH 12/13] news: updated news entry Signed-off-by: Balazs Scheidler --- news/feature-4814.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 news/feature-4814.md diff --git a/news/feature-4814.md b/news/feature-4814.md new file mode 100644 index 0000000000..d8b48cba9c --- /dev/null +++ b/news/feature-4814.md @@ -0,0 +1,4 @@ +`syslog()` source driver: add support for RFC6587 style auto-detection of +octet-count based framing to avoid confusion that stems from the sender +using a different protocol to the server. This behaviour can be enabled +by using `transport(auto)` option for the `syslog()` source. From 25d68340a88fe374f2e49db2486c63473f696ed5 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 20 Oct 2024 19:34:00 +0200 Subject: [PATCH 13/13] tests/copyright: updated policy about new files Signed-off-by: Balazs Scheidler --- tests/copyright/policy | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/copyright/policy b/tests/copyright/policy index 6e638b3707..caeb3e1ebf 100644 --- a/tests/copyright/policy +++ b/tests/copyright/policy @@ -136,6 +136,8 @@ modules/python/python-confgen\.[ch] lib/tests/test_logscheduler\.c lib/filterx/.*\.[ch] lib/filterx/filterx-grammar\.ym +lib/logproto/logproto-auto-server\.[ch] +lib/transport/tests/test_transport\.c ########################################################################### # These tests are GPLd even though they reside under lib/ and are excluded @@ -271,6 +273,7 @@ tests/light/functional_tests/filterx/test_filterx\.py tests/light/functional_tests/filterx/test_filterx_scope\.py tests/light/functional_tests/filterx/test_filterx_update_metric\.py tests/light/functional_tests/parsers/metrics-probe/test_metrics_probe\.py +tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto\.py tests/light/src/syslog_ng_ctl/prometheus_stats_handler.py tests/light/src/syslog_ng_config/statements/template/template\.py tests/light/src/syslog_ng_config/statements/__init__\.py