diff --git a/lib/driver.c b/lib/driver.c index c16de990a2..1481879dfe 100644 --- a/lib/driver.c +++ b/lib/driver.c @@ -142,7 +142,7 @@ static void log_driver_init_instance(LogDriver *self, GlobalConfig *cfg) { log_pipe_init_instance(&self->super, cfg); - self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX; + self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG; self->super.free_fn = log_driver_free; self->super.pre_init = log_driver_pre_init_method; self->super.init = log_driver_init_method; diff --git a/lib/filter/filter-pipe.c b/lib/filter/filter-pipe.c index 550d42a9f2..22e7b06c80 100644 --- a/lib/filter/filter-pipe.c +++ b/lib/filter/filter-pipe.c @@ -122,7 +122,7 @@ log_filter_pipe_new(FilterExprNode *expr, GlobalConfig *cfg) LogFilterPipe *self = g_new0(LogFilterPipe, 1); log_pipe_init_instance(&self->super, cfg); - self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX; + self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG; self->super.init = log_filter_pipe_init; self->super.queue = log_filter_pipe_queue; self->super.free_fn = log_filter_pipe_free; diff --git a/lib/filterx/filterx-pipe.c b/lib/filterx/filterx-pipe.c index 7fb0fa34a4..cc4ccea9eb 100644 --- a/lib/filterx/filterx-pipe.c +++ b/lib/filterx/filterx-pipe.c @@ -56,6 +56,9 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o path_options = log_path_options_chain(&local_path_options, path_options); filterx_eval_init_context(&eval_context, path_options->filterx_context); + if (filterx_scope_has_log_msg_changes(eval_context.scope)) + filterx_scope_invalidate_log_msg_cache(eval_context.scope); + msg_trace(">>>>>> filterx rule evaluation begin", evt_tag_str("rule", self->name), log_pipe_location_tag(s), diff --git a/lib/filterx/filterx-scope.c b/lib/filterx/filterx-scope.c index 6c95fd9b93..c240222a2b 100644 --- a/lib/filterx/filterx-scope.c +++ b/lib/filterx/filterx-scope.c @@ -102,7 +102,7 @@ struct _FilterXScope { GAtomicCounter ref_cnt; GArray *variables; - guint32 generation:20, write_protected, dirty, syncable; + guint32 generation:20, write_protected, dirty, syncable, log_msg_has_changes; }; static gboolean @@ -138,6 +138,24 @@ _lookup_variable(FilterXScope *self, FilterXVariableHandle handle, FilterXVariab return FALSE; } +void +filterx_scope_set_log_msg_has_changes(FilterXScope *self) +{ + self->log_msg_has_changes = TRUE; +} + +void +filterx_scope_clear_log_msg_has_changes(FilterXScope *self) +{ + self->log_msg_has_changes = FALSE; +} + +gboolean +filterx_scope_has_log_msg_changes(FilterXScope *self) +{ + return self->log_msg_has_changes; +} + void filterx_scope_set_dirty(FilterXScope *self) { @@ -173,6 +191,8 @@ filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle) if (filterx_variable_handle_is_floating(handle) && !v->declared && v->generation != self->generation) return NULL; + if (!filterx_variable_handle_is_floating(handle) && v->generation == 0 && self->syncable) + return NULL; return v; } return NULL; @@ -382,6 +402,7 @@ filterx_scope_clone(FilterXScope *other) if (other->variables->len > 0) self->dirty = other->dirty; self->syncable = other->syncable; + self->log_msg_has_changes = other->log_msg_has_changes; msg_trace("Filterx clone finished", evt_tag_printf("scope", "%p", self), evt_tag_printf("other", "%p", other), @@ -436,3 +457,19 @@ filterx_scope_unref(FilterXScope *self) if (self && (g_atomic_counter_dec_and_test(&self->ref_cnt))) _free(self); } + +void +filterx_scope_invalidate_log_msg_cache(FilterXScope *self) +{ + g_assert(filterx_scope_has_log_msg_changes(self)); + + for (gint i = 0; i < self->variables->len; i++) + { + FilterXVariable *v = &g_array_index(self->variables, FilterXVariable, i); + + if (!filterx_variable_is_floating(v) && self->syncable) + v->generation = 0; + } + + filterx_scope_clear_log_msg_has_changes(self); +} diff --git a/lib/filterx/filterx-scope.h b/lib/filterx/filterx-scope.h index fcf346d62d..07ad1e5207 100644 --- a/lib/filterx/filterx-scope.h +++ b/lib/filterx/filterx-scope.h @@ -54,10 +54,14 @@ gboolean filterx_variable_is_set(FilterXVariable *v); * upon the exit from the scope. * */ + typedef struct _FilterXScope FilterXScope; typedef gboolean (*FilterXScopeForeachFunc)(FilterXVariable *variable, gpointer user_data); +void filterx_scope_set_log_msg_has_changes(FilterXScope *self); +void filterx_scope_clear_log_msg_has_changes(FilterXScope *self); +gboolean filterx_scope_has_log_msg_changes(FilterXScope *self); void filterx_scope_set_dirty(FilterXScope *self); gboolean filterx_scope_is_dirty(FilterXScope *self); void filterx_scope_sync(FilterXScope *self, LogMessage *msg); @@ -71,6 +75,7 @@ FilterXVariable *filterx_scope_register_declared_variable(FilterXScope *self, FilterXVariableHandle handle, FilterXObject *initial_value); gboolean filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func, gpointer user_data); +void filterx_scope_invalidate_log_msg_cache(FilterXScope *self); /* copy on write */ void filterx_scope_write_protect(FilterXScope *self); @@ -80,4 +85,4 @@ FilterXScope *filterx_scope_new(void); FilterXScope *filterx_scope_ref(FilterXScope *self); void filterx_scope_unref(FilterXScope *self); -#endif \ No newline at end of file +#endif diff --git a/lib/logmsg/logmsg.c b/lib/logmsg/logmsg.c index 68a1ceca9e..67fce48f2c 100644 --- a/lib/logmsg/logmsg.c +++ b/lib/logmsg/logmsg.c @@ -323,6 +323,11 @@ log_msg_make_writable(LogMessage **pself, const LogPathOptions *path_options) log_msg_unref(*pself); *pself = new; } + if(path_options->filterx_context && path_options->filterx_context->scope) + { + filterx_scope_make_writable(&path_options->filterx_context->scope); + filterx_scope_set_log_msg_has_changes(path_options->filterx_context->scope); + } return *pself; } diff --git a/lib/logpipe.h b/lib/logpipe.h index b955be567c..5d40c2eadd 100644 --- a/lib/logpipe.h +++ b/lib/logpipe.h @@ -72,8 +72,8 @@ /* node created directly by the user */ #define PIF_CONFIG_RELATED 0x0100 -/* sync filterx state and message in right before calling queue() */ -#define PIF_SYNC_FILTERX 0x0200 +/* sync filterx state to message in right before calling queue() */ +#define PIF_SYNC_FILTERX_TO_MSG 0x0200 /* private flags range, to be used by other LogPipe instances for their own purposes */ @@ -222,8 +222,8 @@ struct _LogPathOptions FilterXEvalContext *filterx_context; }; -#define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL } -#define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL, NULL } +#define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL, NULL} +#define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL, NULL, NULL } /* * Embed a step in our LogPathOptions chain. @@ -459,7 +459,7 @@ log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) } } - if ((s->flags & PIF_SYNC_FILTERX)) + if ((s->flags & PIF_SYNC_FILTERX_TO_MSG)) filterx_eval_sync_message(path_options->filterx_context, &msg, path_options); if (G_UNLIKELY(s->flags & (PIF_HARD_FLOW_CONTROL | PIF_JUNCTION_END | PIF_CONDITIONAL_MIDPOINT))) diff --git a/lib/parser/parser-expr.c b/lib/parser/parser-expr.c index 5d48852ab9..5dc3926ffb 100644 --- a/lib/parser/parser-expr.c +++ b/lib/parser/parser-expr.c @@ -182,7 +182,7 @@ void log_parser_init_instance(LogParser *self, GlobalConfig *cfg) { log_pipe_init_instance(&self->super, cfg); - self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX; + self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG; self->super.init = log_parser_init_method; self->super.deinit = log_parser_deinit_method; self->super.free_fn = log_parser_free_method; diff --git a/lib/rewrite/rewrite-expr.c b/lib/rewrite/rewrite-expr.c index d696f09339..aa28988cb4 100644 --- a/lib/rewrite/rewrite-expr.c +++ b/lib/rewrite/rewrite-expr.c @@ -98,7 +98,7 @@ log_rewrite_init_instance(LogRewrite *self, GlobalConfig *cfg) { log_pipe_init_instance(&self->super, cfg); /* indicate that the rewrite rule is changing the message */ - self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX; + self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG; self->super.free_fn = log_rewrite_free_method; self->super.queue = log_rewrite_queue; self->super.init = log_rewrite_init_method; diff --git a/modules/csvparser/tests/test_csvparser.c b/modules/csvparser/tests/test_csvparser.c index b232f7e20d..552697e668 100644 --- a/modules/csvparser/tests/test_csvparser.c +++ b/modules/csvparser/tests/test_csvparser.c @@ -840,7 +840,8 @@ ParameterizedTest(CsvParserTestParam *param, parser, test_csv_parser) cr_assert(log_pipe_init(&pclone->super)); nvtable = nv_table_ref(logmsg->payload); - success = log_parser_process(pclone, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1); + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; + success = log_parser_process(pclone, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1); nv_table_unref(nvtable); cr_assert_not((success && !param->expected_values[0]), "unexpected match; msg=%s\n", param->msg); diff --git a/modules/timestamp/tests/test_date.c b/modules/timestamp/tests/test_date.c index 5956afceaa..e866aa7b5f 100644 --- a/modules/timestamp/tests/test_date.c +++ b/modules/timestamp/tests/test_date.c @@ -186,7 +186,8 @@ ParameterizedTest(struct date_params *params, date, test_date_parser) GString *res = g_string_sized_new(128); logmsg = _construct_logmsg(params->msg); - success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1); + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; + success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1); cr_assert(success, "unable to parse format=%s msg=%s", params->format, params->msg); @@ -203,11 +204,13 @@ ParameterizedTest(struct date_params *params, date, test_date_parser) Test(date, test_date_with_additional_text_at_the_end) { + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; const gchar *msg = "2015-01-26T16:14:49+0300 Disappointing log file"; LogParser *parser = _construct_parser(NULL, NULL, LM_TS_STAMP); LogMessage *logmsg = _construct_logmsg(msg); - gboolean success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1); + gboolean success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), + -1); cr_assert_not(success, "successfully parsed but expected failure, msg=%s", msg); @@ -247,8 +250,10 @@ ParameterizedTest(struct date_with_multiple_formats_params *params, date, test_d date_parser_set_time_stamp(parser, LM_TS_STAMP); LogMessage *logmsg = _construct_logmsg(params->msg); + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; - gboolean success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1); + gboolean success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), + -1); cr_assert(success, "unable to parse msg=%s with a list of formats", params->msg); @@ -269,7 +274,10 @@ Test(date, test_date_with_guess_timezone) date_parser_process_flag(parser, "guess-timezone"); LogMessage *logmsg = _construct_logmsg(msg); - gboolean success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1); + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; + + gboolean success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), + -1); cr_assert(success, "failed to parse timestamp, msg=%s", msg); append_format_unix_time(&logmsg->timestamps[LM_TS_STAMP], res, TS_FMT_ISO, -1, 0); diff --git a/tests/light/functional_tests/filterx/test_filterx.py b/tests/light/functional_tests/filterx/test_filterx.py index 046b584655..b6e6200c5b 100644 --- a/tests/light/functional_tests/filterx/test_filterx.py +++ b/tests/light/functional_tests/filterx/test_filterx.py @@ -2395,3 +2395,55 @@ def test_parse_leef(config, syslog_ng): r"""}""" + "\n" ) assert file_true.read_log() == exp + + +def test_proper_generation_counter(config, syslog_ng): + file_true = config.create_file_destination(file_name="dest-true.log", template="'$MSG\n'") + file_false = config.create_file_destination(file_name="dest-false.log", template="'$MSG\n'") + + raw_conf = f""" +@version: {config.get_version()} + +options {{ stats(level(1)); }}; + +source genmsg {{ + example-msg-generator(num(1) template("dummy message") values("bar" => "bar_value")); +}}; + +destination dest_true {{ + {render_statement(file_true)}; +}}; + +destination dest_false {{ + {render_statement(file_false)}; +}}; + +log {{ + source(genmsg); + if {{ + filterx {{ + $foo = "foovalue"; # Could have come from the log message as well, doesn't matter + $ISODATE; # Special case for macro resolution + unset(${{values.str}}); # Must come from the log message + }}; + + rewrite {{ set("almafa", value("foo")); }}; + parser {{ date-parser(format("%Y-%m-%dT%H:%M:%S%z") template("2000-01-01T00:00:00+01:00")); }}; + rewrite {{ set("kortefa", value("values.str")); }}; + + filterx {{ + $MSG = {{"from_nvtable": $foo, "from_a_macro": $ISODATE, "unset_then_set": ${{values.str}} ?? "not found"}}; + }}; + destination(dest_true); + }} else {{ + destination(dest_false); + }}; +}}; +""" + config.set_raw_config(raw_conf) + + syslog_ng.start(config) + + assert "processed" in file_true.get_stats() + assert file_true.get_stats()["processed"] == 1 + assert file_true.read_log() == '{"from_nvtable":"almafa","from_a_macro":"2000-01-01T00:00:00+01:00","unset_then_set":"kortefa"}\n' diff --git a/tests/light/src/syslog_ng/syslog_ng_cli.py b/tests/light/src/syslog_ng/syslog_ng_cli.py index a1bba09e42..b9af9280b4 100644 --- a/tests/light/src/syslog_ng/syslog_ng_cli.py +++ b/tests/light/src/syslog_ng/syslog_ng_cli.py @@ -72,7 +72,7 @@ def __syntax_check(self): result = self.__syntax_only() if result["exit_code"] != 0: logger.error(result["stderr"]) - raise Exception("syslog-ng can not started exit_code={}".format(result["exit_code"])) + raise Exception("syslog-ng could not start exit_code={}".format(result["exit_code"])) def is_process_running(self): return self.__process and self.__process.poll() is None