From 1cc68af4f4c4d00e8240b8abd06b307ee6766b99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Niemier?= Date: Mon, 20 May 2024 13:10:35 +0200 Subject: [PATCH] ft: TCP backend --- lib/logflare/backends/adaptor/tcp_adaptor.ex | 44 +++++ .../backends/adaptor/tcp_adaptor/pool.ex | 52 ++++++ .../backends/adaptor/tcp_adaptor/syslog.ex | 81 +++++++++ lib/logflare/backends/backend.ex | 3 +- mix.exs | 1 + .../backends/adaptor/tcp_adaptor_test.exs | 162 ++++++++++++++++++ test/support/syslog/run.sh | 24 +++ test/support/syslog/telegraf.conf | 128 ++++++++++++++ test/test_helper.exs | 11 +- 9 files changed, 504 insertions(+), 2 deletions(-) create mode 100644 lib/logflare/backends/adaptor/tcp_adaptor.ex create mode 100644 lib/logflare/backends/adaptor/tcp_adaptor/pool.ex create mode 100644 lib/logflare/backends/adaptor/tcp_adaptor/syslog.ex create mode 100644 test/logflare/backends/adaptor/tcp_adaptor_test.exs create mode 100755 test/support/syslog/run.sh create mode 100644 test/support/syslog/telegraf.conf diff --git a/lib/logflare/backends/adaptor/tcp_adaptor.ex b/lib/logflare/backends/adaptor/tcp_adaptor.ex new file mode 100644 index 000000000..0efc7db12 --- /dev/null +++ b/lib/logflare/backends/adaptor/tcp_adaptor.ex @@ -0,0 +1,44 @@ +defmodule Logflare.Backends.Adaptor.TCPAdaptor do + use TypedStruct + + import Ecto.Changeset + + alias Logflare.Backends.Adaptor.TCPAdaptor.Pool + alias Logflare.Backends.Adaptor.TCPAdaptor.Syslog + + @behaviour Logflare.Backends.Adaptor + + typedstruct enforce: true do + field(:tls, boolean()) + field(:host, String.t()) + field(:port, non_neg_integer()) + end + + @impl true + def start_link({_source, backend}) do + Pool.start_link(backend.config) + end + + @impl true + def cast_config(params) do + {%{}, %{tls: :bool, host: :string, port: :integer}} + |> cast(params, [:tls, :host, :port]) + end + + @impl true + def validate_config(changeset) do + changeset + # Port is at most max(u16) + |> validate_inclusion(:port, 0..0xFFFF) + end + + @impl true + def ingest(pool, log_events, _opts) do + content = Enum.map(log_events, &Syslog.format(&1, [])) + + Pool.send(pool, content) + end + + @impl true + def execute_query(_id, _query), do: {:error, :not_implemented} +end diff --git a/lib/logflare/backends/adaptor/tcp_adaptor/pool.ex b/lib/logflare/backends/adaptor/tcp_adaptor/pool.ex new file mode 100644 index 000000000..0e7662c23 --- /dev/null +++ b/lib/logflare/backends/adaptor/tcp_adaptor/pool.ex @@ -0,0 +1,52 @@ +defmodule Logflare.Backends.Adaptor.TCPAdaptor.Pool do + @behaviour NimblePool + + def start_link(config) do + NimblePool.start_link(worker: {__MODULE__, config}, lazy: true) + end + + def send(pool, message) do + NimblePool.checkout!(pool, :send, fn _from, socket -> + result = :gen_tcp.send(socket, message) + + {result, result} + end) + end + + @impl NimblePool + def init_worker(%{host: host, port: port} = state) do + this = self() + + # TODO: Add SSL support there + async = fn -> + {:ok, socket} = + :gen_tcp.connect(to_charlist(host), port, + mode: :binary, + nodelay: true + ) + + :gen_tcp.controlling_process(socket, this) + + socket + end + + {:async, async, state} + end + + @impl NimblePool + def handle_checkout(_command, _from, socket, state) do + {:ok, socket, socket, state} + end + + @impl NimblePool + # Ignore any data sent over the socket + def handle_info({:tcp, socket, _}, socket), + do: {:ok, socket} + + def handle_info({:tcp_closed, socket}, socket), + do: {:remove, "connection closed"} + + def handle_info(_other, socket) do + {:ok, socket} + end +end diff --git a/lib/logflare/backends/adaptor/tcp_adaptor/syslog.ex b/lib/logflare/backends/adaptor/tcp_adaptor/syslog.ex new file mode 100644 index 000000000..599f452be --- /dev/null +++ b/lib/logflare/backends/adaptor/tcp_adaptor/syslog.ex @@ -0,0 +1,81 @@ +defmodule Logflare.Backends.Adaptor.TCPAdaptor.Syslog do + @moduledoc """ + Implementation of [RFC5424 The Syslog Protocol][] + + It uses [octet-counting framing][]. + + [RFC5424]: https://www.rfc-editor.org/rfc/rfc5424 + [octet-counting framing]: https://www.rfc-editor.org/rfc/rfc6587#section-3.4.1 + """ + + alias Logflare.LogEvent + + # TODO: Change it to real value + @pen 62137 + + def format(%LogEvent{} = le, options) do + msg = [ + header(le, options), + " ", + structured_data(le, options), + " ", + Jason.encode!(le.body), + "\n" + ] + + # TODO: Add support for non-transparent framing + [to_string(IO.iodata_length(msg)), ?\s, msg] + end + + defp header(%LogEvent{} = le, options) do + level = to_level(le.body["level"] || le.body["metadata"]["level"]) + facility = options[:facility] || 16 + + ingested_at = DateTime.from_naive!(le.ingested_at, "Etc/UTC") + + id = Ecto.UUID.dump!(le.id) |> Base.encode32(case: :lower, padding: false) + + [ + # Level and facility + "<#{facility * 8 + level}>1 ", + DateTime.to_iso8601(ingested_at), + # XXX: Unknown hostname? + " -", + " ", + le.source.name, + # Unknown procname + " -", + " ", + id + ] + end + + defp structured_data(%LogEvent{} = le, _options) do + [ + "[source@#{@pen} name=#{inspect(le.source.name)} id=\"#{le.source.id}\"]" + ] + end + + @levels Map.new( + Enum.with_index(~w[emergency alert critical error warning notice informational debug]) + ) + @shorhands %{ + "emer" => @levels["emergency"], + "crit" => @levels["critical"], + "err" => @levels["error"], + "warn" => @levels["warning"], + "info" => @levels["informational"] + } + + @default @levels["notice"] + + defp to_level(level) when level in 0..7, do: level + + defp to_level(str) when is_binary(str) do + str = String.downcase(str) + # Unquote there to force compile time evaluation + @levels[str] || @shorhands[str] || @default + end + + defp to_level(_), do: @default +end diff --git a/lib/logflare/backends/backend.ex b/lib/logflare/backends/backend.ex index e4a7ee439..2e41cb643 100644 --- a/lib/logflare/backends/backend.ex +++ b/lib/logflare/backends/backend.ex @@ -15,7 +15,8 @@ defmodule Logflare.Backends.Backend do elastic: Adaptor.ElasticAdaptor, datadog: Adaptor.DatadogAdaptor, postgres: Adaptor.PostgresAdaptor, - bigquery: Adaptor.BigQueryAdaptor + bigquery: Adaptor.BigQueryAdaptor, + tcp: Adaptor.TCPAdaptor } typed_schema "backends" do diff --git a/mix.exs b/mix.exs index 8f5f404a7..cfb2d0d9c 100644 --- a/mix.exs +++ b/mix.exs @@ -125,6 +125,7 @@ defmodule Logflare.Mixfile do {:swoosh, "~> 0.23"}, {:ex_twilio, "~> 0.8.1"}, {:tesla, "~> 1.6"}, + {:nimble_pool, "~> 1.1"}, # Concurrency and pipelines {:broadway, "~> 1.0.6"}, diff --git a/test/logflare/backends/adaptor/tcp_adaptor_test.exs b/test/logflare/backends/adaptor/tcp_adaptor_test.exs new file mode 100644 index 000000000..40ff09423 --- /dev/null +++ b/test/logflare/backends/adaptor/tcp_adaptor_test.exs @@ -0,0 +1,162 @@ +defmodule Logflare.Backends.Adaptor.TCPAdaptorTest do + use Logflare.DataCase + + @subject Logflare.Backends.Adaptor.TCPAdaptor + + doctest @subject + + setup do + user = insert(:user) + source = insert(:source, user_id: user.id) + + {port, socket} = listen() + + backend = + insert(:backend, + type: :tcp, + sources: [source], + config: %{host: "localhost", port: port, tls: false} + ) + + {:ok, source: source, backend: backend, port: port, socket: socket} + end + + describe "ingest/3" do + test "simple message", %{source: source, backend: backend} do + le = build(:log_event, source: source) + + {:ok, pid} = @subject.start_link({source, backend}) + + _ = @subject.ingest(pid, [le], []) + + assert_receive {:tcp, _msg}, 5000 + end + + test "message contains source ID", %{source: source, backend: backend} do + le = build(:log_event, source: source) + + {:ok, pid} = @subject.start_link({source, backend}) + + _ = @subject.ingest(pid, [le], []) + + assert_receive {:tcp, msg}, 5000 + + assert msg =~ ~r/id="#{source.id}"/ + end + end + + describe "telegraf" do + @tag telegraf: true + setup do + user = insert(:user) + source = insert(:source, user_id: user.id) + {:ok, port, tcp_port} = telegraf() + + backend = + insert(:backend, + type: :tcp, + sources: [source], + config: %{host: "localhost", port: tcp_port, tls: false} + ) + + {:ok, syslog_port: tcp_port, telegraf: port, backend: backend, source: source} + end + + test "simple message", %{source: source, backend: backend, telegraf: port} do + le = build(:log_event, source: source) + + {:ok, pid} = @subject.start_link({source, backend}) + + _ = @subject.ingest(pid, [le], []) + + assert_receive {^port, {:data, {:eol, data}}}, 10_000 + content = Jason.decode!(data) + assert "syslog" == content["name"] + end + end + + # Simple TCP server + defp listen do + this = self() + + spawn_link(fn -> + {:ok, sock} = + :gen_tcp.listen(0, + mode: :binary, + active: :once + ) + + {:ok, port} = :inet.port(sock) + + send(this, {port, sock}) + + acceptor(sock, this) + end) + + receive do + {port, sock} -> {port, sock} + end + end + + defp acceptor(socket, parent) do + {:ok, lsock} = :gen_tcp.accept(socket) + ref = make_ref() + + pid = + spawn_link(fn -> + receive do + ^ref -> server(lsock, parent) + end + end) + + :gen_tcp.controlling_process(lsock, pid) + send(pid, ref) + + acceptor(socket, parent) + end + + defp server(sock, pid) do + receive do + {:tcp_close, ^sock} -> + :ok + + {:tcp, ^sock, msg} -> + send(pid, {:tcp, msg}) + server(sock, pid) + end + end + + defp telegraf(options \\ []) do + opts = + Map.merge( + %{ + framing: "octet-counting", + port: 6789 + }, + Map.new(options) + ) + + env = [ + {~c"SYSLOG_PORT", to_charlist(opts.port)}, + {~c"SYSLOG_FRAMING", to_charlist(opts.framing)} + ] + + wrapper = Path.expand("./test/support/syslog/run.sh") + telegraf = System.find_executable("telegraf") + + port = + Port.open( + {:spawn_executable, to_charlist(wrapper)}, + [ + :binary, + line: 16 * 1024, + env: env, + args: [telegraf, "--config", "test/support/syslog/telegraf.conf"] + ] + ) + + Process.sleep(1000) + + {:ok, port, opts.port} + end +end diff --git a/test/support/syslog/run.sh b/test/support/syslog/run.sh new file mode 100755 index 000000000..2168304c8 --- /dev/null +++ b/test/support/syslog/run.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +set -eu -o pipefail + +# Start the program in the background +exec "$@" & +pid1=$! + +# Silence warnings from here on +exec >/dev/null 2>&1 + +# Read from stdin in the background and +# kill running program when stdin closes +exec 0<&0 $( + while read; do :; done + kill -KILL $pid1 +) & +pid2=$! + +# Clean up +wait $pid1 +ret=$? +kill -KILL $pid2 +exit $ret diff --git a/test/support/syslog/telegraf.conf b/test/support/syslog/telegraf.conf new file mode 100644 index 000000000..b05345e89 --- /dev/null +++ b/test/support/syslog/telegraf.conf @@ -0,0 +1,128 @@ +# Telegraf Configuration +# +# Telegraf is entirely plugin driven. All metrics are gathered from the +# declared inputs, and sent to the declared outputs. +# +# Plugins must be declared in here to be active. +# To deactivate a plugin, comment out the name and any variables. +# +# Use 'telegraf -config telegraf.conf -test' to see what metrics a config +# file would generate. +# +# Environment variables can be used anywhere in this config file, simply surround +# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"), +# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR}) + + +# Global tags can be specified here in key="value" format. +[global_tags] + # dc = "us-east-1" # will tag all metrics with dc=us-east-1 + # rack = "1a" + ## Environment variables can be used as tags, and throughout the config file + # user = "$USER" + +# Configuration for telegraf agent +[agent] + ## Default data collection interval for all inputs + interval = "1s" + ## Rounds collection interval to 'interval' + ## ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = false + + ## Telegraf will send metrics to outputs in batches of at most + ## metric_batch_size metrics. + ## This controls the size of writes that Telegraf sends to output plugins. + metric_batch_size = 1 + + ## Maximum number of unwritten metrics per output. Increasing this value + ## allows for longer periods of output downtime without dropping metrics at the + ## cost of higher maximum memory usage. + metric_buffer_limit = 1 + + ## Collection jitter is used to jitter the collection by a random amount. + ## Each plugin will sleep for a random time within jitter before collecting. + ## This can be used to avoid many plugins querying things like sysfs at the + ## same time, which can have a measurable effect on the system. + collection_jitter = "0s" + + ## Default flushing interval for all outputs. Maximum flush_interval will be + ## flush_interval + flush_jitter + flush_interval = "1s" + ## Jitter the flush interval by a random amount. This is primarily to avoid + ## large write spikes for users running a large number of telegraf instances. + ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" + + precision = "0s" + + ## Override default hostname, if empty use os.Hostname() + hostname = "" + ## If set to true, do no set the "host" tag in the telegraf agent. + omit_hostname = false + +############################################################################### +# OUTPUT PLUGINS # +############################################################################### + +# # Send telegraf metrics to file(s) +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["stdout"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "json" + +############################################################################### +# SERVICE INPUT PLUGINS # +############################################################################### + +[[inputs.syslog]] + ## Protocol, address and port to host the syslog receiver. + ## If no host is specified, then localhost is used. + ## If no port is specified, 6514 is used (RFC5425#section-4.1). + ## ex: server = "tcp://localhost:6514" + ## server = "udp://:6514" + ## server = "unix:///var/run/telegraf-syslog.sock" + ## When using tcp, consider using 'tcp4' or 'tcp6' to force the usage of IPv4 + ## or IPV6 respectively. There are cases, where when not specified, a system + ## may force an IPv4 mapped IPv6 address. + server = "tcp://127.0.0.1:${SYSLOG_PORT}" + + ## Read timeout (only available on stream sockets like TCP) + ## Zero means unlimited. + # read_timeout = "0s" + + ## Optional TLS configuration (only available on stream sockets like TCP) + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Enables client authentication if set. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Framing technique used for messages transport + ## Available settings are: + ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1 + ## non-transparent -- see RFC6587#section-3.4.2 + framing = "${SYSLOG_FRAMING}" + + ## The trailer to be expected in case of non-transparent framing (default = "LF"). + ## Must be one of "LF", or "NUL". + trailer = "LF" + + ## Whether to parse in best effort mode or not (default = false). + ## By default best effort parsing is off. + best_effort = false + + ## The RFC standard to use for message parsing + ## By default RFC5424 is used. RFC3164 only supports UDP transport (no streaming support) + ## Must be one of "RFC5424", or "RFC3164". + syslog_standard = "RFC5424" + + ## Character to prepend to SD-PARAMs (default = "_"). + ## A syslog message can contain multiple parameters and multiple identifiers within structured data section. + ## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"] + ## For each combination a field is created. + ## Its name is created concatenating identifier, sdparam_separator, and parameter name. + # sdparam_separator = "_" diff --git a/test/test_helper.exs b/test/test_helper.exs index 9d9896850..4afcbd868 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -42,6 +42,15 @@ Mimic.copy(ExTwilio.Message) Mimic.stub(Goth) Mimic.stub(Finch) -ExUnit.configure(exclude: [integration: true, failing: true, benchmark: true]) +has_telegraf? = not is_nil(System.find_executable("telegraf")) + +ExUnit.configure( + exclude: [ + integration: true, + failing: true, + benchmark: true, + telegraf: not has_telegraf? + ] +) Ecto.Adapters.SQL.Sandbox.mode(Logflare.Repo, :manual)