Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ft: TCP adaptor #2067

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor do
use TypedStruct

import Ecto.Changeset

alias Logflare.Backends.Adaptor.TCPAdaptor.Pool
alias Logflare.Backends.Adaptor.TCPAdaptor.Syslog

@behaviour Logflare.Backends.Adaptor

typedstruct enforce: true do
field(:tls, boolean())
field(:host, String.t())
field(:port, non_neg_integer())
end

@impl true
def start_link({_source, backend}) do
Pool.start_link(backend.config)
end

@impl true
def cast_config(params) do
{%{}, %{tls: :bool, host: :string, port: :integer}}
|> cast(params, [:tls, :host, :port])
end

@impl true
def validate_config(changeset) do
changeset
# Port is at most max(u16)
|> validate_inclusion(:port, 0..0xFFFF)
end

@impl true
def ingest(pool, log_events, _opts) do
content = Enum.map(log_events, &Syslog.format(&1, []))

Pool.send(pool, content)
end

@impl true
def execute_query(_id, _query), do: {:error, :not_implemented}
end
52 changes: 52 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor/pool.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor.Pool do
@behaviour NimblePool

def start_link(config) do
NimblePool.start_link(worker: {__MODULE__, config}, lazy: true)
end

def send(pool, message) do
NimblePool.checkout!(pool, :send, fn _from, socket ->
result = :gen_tcp.send(socket, message)

{result, result}
end)
end

@impl NimblePool
def init_worker(%{host: host, port: port} = state) do
this = self()

# TODO: Add SSL support there
async = fn ->
{:ok, socket} =
:gen_tcp.connect(to_charlist(host), port,
mode: :binary,
nodelay: true
)

:gen_tcp.controlling_process(socket, this)

socket
end

{:async, async, state}
end

@impl NimblePool
def handle_checkout(_command, _from, socket, state) do
{:ok, socket, socket, state}
end

@impl NimblePool
# Ignore any data sent over the socket
def handle_info({:tcp, socket, _}, socket),
do: {:ok, socket}

def handle_info({:tcp_closed, socket}, socket),
do: {:remove, "connection closed"}

def handle_info(_other, socket) do
{:ok, socket}
end
end
81 changes: 81 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor/syslog.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor.Syslog do
@moduledoc """
Implementation of [RFC5424 The Syslog Protocol][]

It uses [octet-counting framing][].

[RFC5424]: https://www.rfc-editor.org/rfc/rfc5424
[octet-counting framing]: https://www.rfc-editor.org/rfc/rfc6587#section-3.4.1
"""

alias Logflare.LogEvent

# TODO: Change it to real value
@pen 62137
Comment on lines +13 to +14
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need to be updated as soon as Supabase or Logflare will have assigned Private Enterprise Number from IANA.


def format(%LogEvent{} = le, options) do
msg = [
header(le, options),
" ",
structured_data(le, options),
" ",
Jason.encode!(le.body),
"\n"
]

# TODO: Add support for non-transparent framing
[to_string(IO.iodata_length(msg)), ?\s, msg]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only octet-counting framing is implemented as I have no idea how to do nested configuration there. Non-transparent framing (with newline endings) is simple to implement, as all that is needed is simply removing byte count and space.

end

defp header(%LogEvent{} = le, options) do
level = to_level(le.body["level"] || le.body["metadata"]["level"])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not have an idea how to extract level from log event in better way. So I am open to having better solution there.

facility = options[:facility] || 16
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Facility is currently not configurable, but honestly I do not know if there is much need for that.


ingested_at = DateTime.from_naive!(le.ingested_at, "Etc/UTC")

id = Ecto.UUID.dump!(le.id) |> Base.encode32(case: :lower, padding: false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of 32 characters limit I decided to encode UUIDs with Base32. Theoretically we could just remove - (dashes) from canonical representation, but I think that this approach is effective enough as well as it reduces amount of data that needs to be transferred.


[
# Level and facility
"<#{facility * 8 + level}>1 ",
DateTime.to_iso8601(ingested_at),
# XXX: Unknown hostname?
" -",
Comment on lines +42 to +43
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left hostname field undefined, though if someone has proposal what could be in this field, then I am open to filling it.

" ",
le.source.name,
# Unknown procname
" -",
" ",
id
]
end

defp structured_data(%LogEvent{} = le, _options) do
[
"[source@#{@pen} name=#{inspect(le.source.name)} id=\"#{le.source.id}\"]"
]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are more metadata informations that should be available there, then I am open to suggestions.

end

@levels Map.new(
Enum.with_index(~w[emergency alert critical error warning notice informational debug])
)
@shorhands %{
"emer" => @levels["emergency"],
"crit" => @levels["critical"],
"err" => @levels["error"],
"warn" => @levels["warning"],
"info" => @levels["informational"]
}

@default @levels["notice"]

defp to_level(level) when level in 0..7, do: level

defp to_level(str) when is_binary(str) do
str = String.downcase(str)
# Unquote there to force compile time evaluation
@levels[str] || @shorhands[str] || @default
end

defp to_level(_), do: @default
end
3 changes: 2 additions & 1 deletion lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Logflare.Backends.Backend do
elastic: Adaptor.ElasticAdaptor,
datadog: Adaptor.DatadogAdaptor,
postgres: Adaptor.PostgresAdaptor,
bigquery: Adaptor.BigQueryAdaptor
bigquery: Adaptor.BigQueryAdaptor,
tcp: Adaptor.TCPAdaptor
}

typed_schema "backends" do
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"test.only": :test,
"test.format": :test,
"test.compile": :test,
"test.security": :test,

Check warning on line 20 in mix.exs

View workflow job for this annotation

GitHub Actions / Build and test

~R/.../ is deprecated, use ~r/.../ instead
"test.typings": :test,
coveralls: :test,
"coveralls.detail": :test,
Expand Down Expand Up @@ -125,6 +125,7 @@
{:swoosh, "~> 0.23"},
{:ex_twilio, "~> 0.8.1"},
{:tesla, "~> 1.6"},
{:nimble_pool, "~> 1.1"},

# Concurrency and pipelines
{:broadway, "~> 1.0.6"},
Expand Down
162 changes: 162 additions & 0 deletions test/logflare/backends/adaptor/tcp_adaptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptorTest do
use Logflare.DataCase

@subject Logflare.Backends.Adaptor.TCPAdaptor

doctest @subject

setup do
user = insert(:user)
source = insert(:source, user_id: user.id)

{port, socket} = listen()

backend =
insert(:backend,
type: :tcp,
sources: [source],
config: %{host: "localhost", port: port, tls: false}
)

{:ok, source: source, backend: backend, port: port, socket: socket}
end

describe "ingest/3" do
test "simple message", %{source: source, backend: backend} do
le = build(:log_event, source: source)

{:ok, pid} = @subject.start_link({source, backend})

_ = @subject.ingest(pid, [le], [])

assert_receive {:tcp, _msg}, 5000
end

test "message contains source ID", %{source: source, backend: backend} do
le = build(:log_event, source: source)

{:ok, pid} = @subject.start_link({source, backend})

_ = @subject.ingest(pid, [le], [])

assert_receive {:tcp, msg}, 5000

assert msg =~ ~r/id="#{source.id}"/
end
end

describe "telegraf" do
@tag telegraf: true
setup do
user = insert(:user)
source = insert(:source, user_id: user.id)
{:ok, port, tcp_port} = telegraf()

backend =
insert(:backend,
type: :tcp,
sources: [source],
config: %{host: "localhost", port: tcp_port, tls: false}
)

{:ok, syslog_port: tcp_port, telegraf: port, backend: backend, source: source}
end

test "simple message", %{source: source, backend: backend, telegraf: port} do
le = build(:log_event, source: source)

{:ok, pid} = @subject.start_link({source, backend})

_ = @subject.ingest(pid, [le], [])

assert_receive {^port, {:data, {:eol, data}}}, 10_000
content = Jason.decode!(data)
assert "syslog" == content["name"]
end
end

# Simple TCP server
defp listen do
this = self()

spawn_link(fn ->
{:ok, sock} =
:gen_tcp.listen(0,
mode: :binary,
active: :once
)

{:ok, port} = :inet.port(sock)

send(this, {port, sock})

acceptor(sock, this)
end)

receive do
{port, sock} -> {port, sock}
end
end

defp acceptor(socket, parent) do
{:ok, lsock} = :gen_tcp.accept(socket)
ref = make_ref()

pid =
spawn_link(fn ->
receive do
^ref -> server(lsock, parent)
end
end)

:gen_tcp.controlling_process(lsock, pid)
send(pid, ref)

acceptor(socket, parent)
end

defp server(sock, pid) do
receive do
{:tcp_close, ^sock} ->
:ok

{:tcp, ^sock, msg} ->
send(pid, {:tcp, msg})
server(sock, pid)
end
end

defp telegraf(options \\ []) do
opts =
Map.merge(
%{
framing: "octet-counting",
port: 6789
},
Map.new(options)
)

env = [
{~c"SYSLOG_PORT", to_charlist(opts.port)},
{~c"SYSLOG_FRAMING", to_charlist(opts.framing)}
]

wrapper = Path.expand("./test/support/syslog/run.sh")
telegraf = System.find_executable("telegraf")

port =
Port.open(
{:spawn_executable, to_charlist(wrapper)},
[
:binary,
line: 16 * 1024,
env: env,
args: [telegraf, "--config", "test/support/syslog/telegraf.conf"]
]
)

Process.sleep(1000)

{:ok, port, opts.port}
end
end
24 changes: 24 additions & 0 deletions test/support/syslog/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

set -eu -o pipefail

# Start the program in the background
exec "$@" &
pid1=$!

# Silence warnings from here on
exec >/dev/null 2>&1

# Read from stdin in the background and
# kill running program when stdin closes
exec 0<&0 $(
while read; do :; done
kill -KILL $pid1
) &
pid2=$!

# Clean up
wait $pid1
ret=$?
kill -KILL $pid2
exit $ret
Loading
Loading