Skip to content

Commit

Permalink
ft: TCP backend
Browse files Browse the repository at this point in the history
  • Loading branch information
hauleth committed Jun 14, 2024
1 parent 47654c5 commit 08aefbf
Show file tree
Hide file tree
Showing 9 changed files with 504 additions and 2 deletions.
44 changes: 44 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor do
use TypedStruct

import Ecto.Changeset

alias Logflare.Backends.Adaptor.TCPAdaptor.Pool
alias Logflare.Backends.Adaptor.TCPAdaptor.Syslog

@behaviour Logflare.Backends.Adaptor

typedstruct enforce: true do
field(:tls, boolean())
field(:host, String.t())
field(:port, non_neg_integer())
end

@impl true
def start_link({_source, backend}) do
Pool.start_link(backend.config)
end

@impl true
def cast_config(params) do
{%{}, %{tls: :bool, host: :string, port: :integer}}
|> cast(params, [:tls, :host, :port])
end

@impl true
def validate_config(changeset) do
changeset
# Port is at most max(u16)
|> validate_inclusion(:port, 0..0xFFFF)
end

@impl true
def ingest(pool, log_events, _opts) do
content = Enum.map(log_events, &Syslog.format(&1, []))

Pool.send(pool, content)
end

@impl true
def execute_query(_id, _query), do: {:error, :not_implemented}
end
52 changes: 52 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor/pool.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor.Pool do
@behaviour NimblePool

def start_link(config) do
NimblePool.start_link(worker: {__MODULE__, config}, lazy: true)
end

def send(pool, message) do
NimblePool.checkout!(pool, :send, fn _from, socket ->
result = :gen_tcp.send(socket, message)

{result, result}
end)
end

@impl NimblePool
def init_worker(%{host: host, port: port} = state) do
this = self()

# TODO: Add SSL support there
async = fn ->
{:ok, socket} =
:gen_tcp.connect(to_charlist(host), port,
mode: :binary,
nodelay: true
)

:gen_tcp.controlling_process(socket, this)

socket
end

{:async, async, state}
end

@impl NimblePool
def handle_checkout(_command, _from, socket, state) do
{:ok, socket, socket, state}
end

@impl NimblePool
# Ignore any data sent over the socket
def handle_info({:tcp, socket, _}, socket),
do: {:ok, socket}

def handle_info({:tcp_closed, socket}, socket),
do: {:remove, "connection closed"}

def handle_info(_other, socket) do
{:ok, socket}
end
end
81 changes: 81 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor/syslog.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor.Syslog do
@moduledoc """
Implementation of [RFC5424 The Syslog Protocol][]
It uses [octet-counting framing][].
[RFC5424]: https://www.rfc-editor.org/rfc/rfc5424
[octet-counting framing]: https://www.rfc-editor.org/rfc/rfc6587#section-3.4.1
"""

alias Logflare.LogEvent

# TODO: Change it to real value
@pen 62137

def format(%LogEvent{} = le, options) do
msg = [
header(le, options),
" ",
structured_data(le, options),
" ",
Jason.encode!(le.body),
"\n"
]

# TODO: Add support for non-transparent framing
[to_string(IO.iodata_length(msg)), ?\s, msg]
end

defp header(%LogEvent{} = le, options) do
level = to_level(le.body["level"] || le.body["metadata"]["level"])
facility = options[:facility] || 16

ingested_at = DateTime.from_naive!(le.ingested_at, "Etc/UTC")

id = Ecto.UUID.dump!(le.id) |> Base.encode32(case: :lower, padding: false)

[
# Level and facility
"<#{facility * 8 + level}>1 ",
DateTime.to_iso8601(ingested_at),
# XXX: Unknown hostname?
" -",
" ",
le.source.name,
# Unknown procname
" -",
" ",
id
]
end

defp structured_data(%LogEvent{} = le, _options) do
[
"[source@#{@pen} name=#{inspect(le.source.name)} id=\"#{le.source.id}\"]"
]
end

@levels Map.new(
Enum.with_index(~w[emergency alert critical error warning notice informational debug])
)
@shorhands %{
"emer" => @levels["emergency"],
"crit" => @levels["critical"],
"err" => @levels["error"],
"warn" => @levels["warning"],
"info" => @levels["informational"]
}

@default @levels["notice"]

defp to_level(level) when level in 0..7, do: level

defp to_level(str) when is_binary(str) do
str = String.downcase(str)
# Unquote there to force compile time evaluation
@levels[str] || @shorhands[str] || @default
end

defp to_level(_), do: @default
end
3 changes: 2 additions & 1 deletion lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Logflare.Backends.Backend do
elastic: Adaptor.ElasticAdaptor,
datadog: Adaptor.DatadogAdaptor,
postgres: Adaptor.PostgresAdaptor,
bigquery: Adaptor.BigQueryAdaptor
bigquery: Adaptor.BigQueryAdaptor,
tcp: Adaptor.TCPAdaptor
}

typed_schema "backends" do
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ defmodule Logflare.Mixfile do
{:swoosh, "~> 0.23"},
{:ex_twilio, "~> 0.8.1"},
{:tesla, "~> 1.6"},
{:nimble_pool, "~> 1.1"},

# Concurrency and pipelines
{:broadway, "~> 1.0.6"},
Expand Down
162 changes: 162 additions & 0 deletions test/logflare/backends/adaptor/tcp_adaptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptorTest do
use Logflare.DataCase

@subject Logflare.Backends.Adaptor.TCPAdaptor

doctest @subject

setup do
user = insert(:user)
source = insert(:source, user_id: user.id)

{port, socket} = listen()

backend =
insert(:backend,
type: :tcp,
sources: [source],
config: %{host: "localhost", port: port, tls: false}
)

{:ok, source: source, backend: backend, port: port, socket: socket}
end

describe "ingest/3" do
test "simple message", %{source: source, backend: backend} do
le = build(:log_event, source: source)

{:ok, pid} = @subject.start_link({source, backend})

_ = @subject.ingest(pid, [le], [])

assert_receive {:tcp, _msg}, 5000
end

test "message contains source ID", %{source: source, backend: backend} do
le = build(:log_event, source: source)

{:ok, pid} = @subject.start_link({source, backend})

_ = @subject.ingest(pid, [le], [])

assert_receive {:tcp, msg}, 5000

assert msg =~ ~r/id="#{source.id}"/
end
end

describe "telegraf" do
@tag telegraf: true
setup do
user = insert(:user)
source = insert(:source, user_id: user.id)
{:ok, port, tcp_port} = telegraf()

backend =
insert(:backend,
type: :tcp,
sources: [source],
config: %{host: "localhost", port: tcp_port, tls: false}
)

{:ok, syslog_port: tcp_port, telegraf: port, backend: backend, source: source}
end

test "simple message", %{source: source, backend: backend, telegraf: port} do
le = build(:log_event, source: source)

{:ok, pid} = @subject.start_link({source, backend})

_ = @subject.ingest(pid, [le], [])

assert_receive {^port, {:data, {:eol, data}}}, 10_000
content = Jason.decode!(data)
assert "syslog" == content["name"]
end
end

# Simple TCP server
defp listen do
this = self()

spawn_link(fn ->
{:ok, sock} =
:gen_tcp.listen(0,
mode: :binary,
active: :once
)

{:ok, port} = :inet.port(sock)

send(this, {port, sock})

acceptor(sock, this)
end)

receive do
{port, sock} -> {port, sock}
end
end

defp acceptor(socket, parent) do
{:ok, lsock} = :gen_tcp.accept(socket)
ref = make_ref()

pid =
spawn_link(fn ->
receive do
^ref -> server(lsock, parent)
end
end)

:gen_tcp.controlling_process(lsock, pid)
send(pid, ref)

acceptor(socket, parent)
end

defp server(sock, pid) do
receive do
{:tcp_close, ^sock} ->
:ok

{:tcp, ^sock, msg} ->
send(pid, {:tcp, msg})
server(sock, pid)
end
end

defp telegraf(options \\ []) do
opts =
Map.merge(
%{
framing: "octet-counting",
port: 6789
},
Map.new(options)
)

env = [
{~c"SYSLOG_PORT", to_charlist(opts.port)},
{~c"SYSLOG_FRAMING", to_charlist(opts.framing)}
]

wrapper = Path.expand("./test/support/syslog/run.sh")
telegraf = System.find_executable("telegraf")

port =
Port.open(
{:spawn_executable, to_charlist(wrapper)},
[
:binary,
line: 16 * 1024,
env: env,
args: [telegraf, "--config", "test/support/syslog/telegraf.conf"]
]
)

Process.sleep(1000)

{:ok, port, opts.port}
end
end
24 changes: 24 additions & 0 deletions test/support/syslog/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

set -eu -o pipefail

# Start the program in the background
exec "$@" &
pid1=$!

# Silence warnings from here on
exec >/dev/null 2>&1

# Read from stdin in the background and
# kill running program when stdin closes
exec 0<&0 $(
while read; do :; done
kill -KILL $pid1
) &
pid2=$!

# Clean up
wait $pid1
ret=$?
kill -KILL $pid2
exit $ret
Loading

0 comments on commit 08aefbf

Please sign in to comment.