Skip to content

Commit

Permalink
feat: http protobuf otel ingestion (#2305)
Browse files Browse the repository at this point in the history
* feat: add descriptor generation

* feat: implement full otel protobuf ingestion

* docs: remove http_protobuf limitation

* chore: version bump

* chore: fix tests that have renamed headers

* fix: failing tests due to no match

* chore: revert commented out lines

* chore: formatting

* chore: fix failing tests

* chore: formatting
  • Loading branch information
Ziinc authored Jan 31, 2025
1 parent 5d23042 commit 32ea7e9
Show file tree
Hide file tree
Showing 18 changed files with 220 additions and 93 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,7 @@ gcloud_prod.json
# google keys
.*.gcloud.json

supabase
supabase

examples/**/.dev.vars
examples/**/node_modules
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.9
1.10.10
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ config :opentelemetry,

config :opentelemetry_exporter,
otlp_protocol: :grpc,
otlp_compression: :gzip,
otlp_compression: :gzip,
otlp_endpoint: "https://otel.logflare.app:443",
otlp_headers: [
{"x-source", "my-source-uuid"},
Expand All @@ -63,5 +63,4 @@ config :opentelemetry_exporter,
### Limitations

- Only **traces** are currently supported.
- Only GRPC as the transport protocol is currently supported. HTTP Protobuf is not yet supported.
- Gzip compression is required.
14 changes: 14 additions & 0 deletions lib/logflare/utils/map.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Logflare.Utils.Map do
@doc """
Retrieves a key, regardless of whether it is a string map or an atom map
iex> #{__MODULE__}.get(%{test: 123}, :test)
123
iex> #{__MODULE__}.get(%{"test"=> 123}, :test)
123
"""
@spec get(map :: map(), key :: atom()) :: term() | nil
def get(map, key) when is_atom(key) do
Map.get(map, key) || Map.get(map, Atom.to_string(key))
end
end
2 changes: 2 additions & 0 deletions lib/logflare_grpc/endpoint.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule LogflareGrpc.Endpoint do
use GRPC.Endpoint

intercept(GRPC.Server.Interceptors.Logger, level: :debug)
intercept LogflareGrpc.HttpProtobufInterceptor
run(LogflareGrpc.Trace.Server)
end
14 changes: 14 additions & 0 deletions lib/logflare_grpc/http_protobuf_interceptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule LogflareGrpc.HttpProtobufInterceptor do
@moduledoc false

@behaviour GRPC.Server.Interceptor

def init(opts) do
opts
end

def call(rpc_req, stream, next, opts) do
dbg({rpc_req, stream, next, opts})
{:ok, stream}
end
end
1 change: 1 addition & 0 deletions lib/logflare_grpc/opentelemetry/proto/logs/v1/logs.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,5 @@ defmodule Opentelemetry.Proto.Logs.V1.LogRecord do
field :flags, 8, type: :fixed32
field :trace_id, 9, type: :bytes, json_name: "traceId"
field :span_id, 10, type: :bytes, json_name: "spanId"
field :event_name, 12, type: :string, json_name: "eventName"
end
Original file line number Diff line number Diff line change
Expand Up @@ -66,46 +66,44 @@ defmodule Opentelemetry.Proto.Profiles.V1development.Profile do
type: Opentelemetry.Proto.Profiles.V1development.Location,
json_name: "locationTable"

field :location_indices, 15, repeated: true, type: :int32, json_name: "locationIndices"
field :location_indices, 5, repeated: true, type: :int32, json_name: "locationIndices"

field :function_table, 5,
field :function_table, 6,
repeated: true,
type: Opentelemetry.Proto.Profiles.V1development.Function,
json_name: "functionTable"

field :attribute_table, 16,
field :attribute_table, 7,
repeated: true,
type: Opentelemetry.Proto.Common.V1.KeyValue,
json_name: "attributeTable"

field :attribute_units, 17,
field :attribute_units, 8,
repeated: true,
type: Opentelemetry.Proto.Profiles.V1development.AttributeUnit,
json_name: "attributeUnits"

field :link_table, 18,
field :link_table, 9,
repeated: true,
type: Opentelemetry.Proto.Profiles.V1development.Link,
json_name: "linkTable"

field :string_table, 6, repeated: true, type: :string, json_name: "stringTable"
field :drop_frames_strindex, 7, type: :int32, json_name: "dropFramesStrindex"
field :keep_frames_strindex, 8, type: :int32, json_name: "keepFramesStrindex"
field :time_nanos, 9, type: :int64, json_name: "timeNanos"
field :duration_nanos, 10, type: :int64, json_name: "durationNanos"
field :string_table, 10, repeated: true, type: :string, json_name: "stringTable"
field :time_nanos, 11, type: :int64, json_name: "timeNanos"
field :duration_nanos, 12, type: :int64, json_name: "durationNanos"

field :period_type, 11,
field :period_type, 13,
type: Opentelemetry.Proto.Profiles.V1development.ValueType,
json_name: "periodType"

field :period, 12, type: :int64
field :comment_strindices, 13, repeated: true, type: :int32, json_name: "commentStrindices"
field :default_sample_type_strindex, 14, type: :int32, json_name: "defaultSampleTypeStrindex"
field :profile_id, 19, type: :bytes, json_name: "profileId"
field :attributes, 20, repeated: true, type: Opentelemetry.Proto.Common.V1.KeyValue
field :dropped_attributes_count, 21, type: :uint32, json_name: "droppedAttributesCount"
field :original_payload_format, 22, type: :string, json_name: "originalPayloadFormat"
field :original_payload, 23, type: :bytes, json_name: "originalPayload"
field :period, 14, type: :int64
field :comment_strindices, 15, repeated: true, type: :int32, json_name: "commentStrindices"
field :default_sample_type_strindex, 16, type: :int32, json_name: "defaultSampleTypeStrindex"
field :profile_id, 17, type: :bytes, json_name: "profileId"
field :dropped_attributes_count, 19, type: :uint32, json_name: "droppedAttributesCount"
field :original_payload_format, 20, type: :string, json_name: "originalPayloadFormat"
field :original_payload, 21, type: :bytes, json_name: "originalPayload"
field :attribute_indices, 22, repeated: true, type: :int32, json_name: "attributeIndices"
end

defmodule Opentelemetry.Proto.Profiles.V1development.AttributeUnit do
Expand Down Expand Up @@ -145,54 +143,40 @@ defmodule Opentelemetry.Proto.Profiles.V1development.Sample do

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0"

field :location_index, 1, repeated: true, type: :int32, json_name: "locationIndex"
field :locations_start_index, 7, type: :int32, json_name: "locationsStartIndex"
field :locations_length, 8, type: :int32, json_name: "locationsLength"
field :value, 2, repeated: true, type: :int64
field :attribute_indices, 10, repeated: true, type: :int32, json_name: "attributeIndices"
field :link_index, 12, type: :int32, json_name: "linkIndex"
field :timestamps_unix_nano, 13, repeated: true, type: :uint64, json_name: "timestampsUnixNano"
end

defmodule Opentelemetry.Proto.Profiles.V1development.Label do
@moduledoc false

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0"

field :key_strindex, 1, type: :int32, json_name: "keyStrindex"
field :str_strindex, 2, type: :int32, json_name: "strStrindex"
field :num, 3, type: :int64
field :num_unit_strindex, 4, type: :int32, json_name: "numUnitStrindex"
field :locations_start_index, 1, type: :int32, json_name: "locationsStartIndex"
field :locations_length, 2, type: :int32, json_name: "locationsLength"
field :value, 3, repeated: true, type: :int64
field :attribute_indices, 4, repeated: true, type: :int32, json_name: "attributeIndices"
field :link_index, 5, proto3_optional: true, type: :int32, json_name: "linkIndex"
field :timestamps_unix_nano, 6, repeated: true, type: :uint64, json_name: "timestampsUnixNano"
end

defmodule Opentelemetry.Proto.Profiles.V1development.Mapping do
@moduledoc false

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0"

field :id, 1, type: :uint64
field :memory_start, 2, type: :uint64, json_name: "memoryStart"
field :memory_limit, 3, type: :uint64, json_name: "memoryLimit"
field :file_offset, 4, type: :uint64, json_name: "fileOffset"
field :filename_strindex, 5, type: :int32, json_name: "filenameStrindex"
field :attribute_indices, 12, repeated: true, type: :int32, json_name: "attributeIndices"
field :has_functions, 7, type: :bool, json_name: "hasFunctions"
field :has_filenames, 8, type: :bool, json_name: "hasFilenames"
field :has_line_numbers, 9, type: :bool, json_name: "hasLineNumbers"
field :has_inline_frames, 10, type: :bool, json_name: "hasInlineFrames"
field :memory_start, 1, type: :uint64, json_name: "memoryStart"
field :memory_limit, 2, type: :uint64, json_name: "memoryLimit"
field :file_offset, 3, type: :uint64, json_name: "fileOffset"
field :filename_strindex, 4, type: :int32, json_name: "filenameStrindex"
field :attribute_indices, 5, repeated: true, type: :int32, json_name: "attributeIndices"
field :has_functions, 6, type: :bool, json_name: "hasFunctions"
field :has_filenames, 7, type: :bool, json_name: "hasFilenames"
field :has_line_numbers, 8, type: :bool, json_name: "hasLineNumbers"
field :has_inline_frames, 9, type: :bool, json_name: "hasInlineFrames"
end

defmodule Opentelemetry.Proto.Profiles.V1development.Location do
@moduledoc false

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0"

field :id, 1, type: :uint64
field :mapping_index, 2, type: :int32, json_name: "mappingIndex"
field :address, 3, type: :uint64
field :line, 4, repeated: true, type: Opentelemetry.Proto.Profiles.V1development.Line
field :is_folded, 5, type: :bool, json_name: "isFolded"
field :attribute_indices, 7, repeated: true, type: :int32, json_name: "attributeIndices"
field :mapping_index, 1, proto3_optional: true, type: :int32, json_name: "mappingIndex"
field :address, 2, type: :uint64
field :line, 3, repeated: true, type: Opentelemetry.Proto.Profiles.V1development.Line
field :is_folded, 4, type: :bool, json_name: "isFolded"
field :attribute_indices, 5, repeated: true, type: :int32, json_name: "attributeIndices"
end

defmodule Opentelemetry.Proto.Profiles.V1development.Line do
Expand All @@ -210,9 +194,8 @@ defmodule Opentelemetry.Proto.Profiles.V1development.Function do

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0"

field :id, 1, type: :uint64
field :name_strindex, 2, type: :int32, json_name: "nameStrindex"
field :system_name_strindex, 3, type: :int32, json_name: "systemNameStrindex"
field :filename_strindex, 4, type: :int32, json_name: "filenameStrindex"
field :start_line, 5, type: :int64, json_name: "startLine"
field :name_strindex, 1, type: :int32, json_name: "nameStrindex"
field :system_name_strindex, 2, type: :int32, json_name: "systemNameStrindex"
field :filename_strindex, 3, type: :int32, json_name: "filenameStrindex"
field :start_line, 4, type: :int64, json_name: "startLine"
end
7 changes: 3 additions & 4 deletions lib/logflare_grpc/trace/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule LogflareGrpc.Trace.Server do

use GRPC.Server,
service: Opentelemetry.Proto.Collector.Trace.V1.TraceService.Service,
compressors: [GRPC.Compressor.Gzip]
# compressors: [GRPC.Compressor.Gzip],
http_transcode: true

require Logger

Expand Down Expand Up @@ -47,10 +48,8 @@ defmodule LogflareGrpc.Trace.Server do

defp get_source_token(stream) do
case GRPC.Stream.get_headers(stream) do
%{"x-source-token" => token} -> {:ok, token}
%{"x-source-id" => token} -> {:ok, token}
%{"x-collection" => token} -> {:ok, token}
%{"x-source" => token} -> {:ok, token}
%{"x-source-uuid" => token} -> {:ok, token}
_ -> {:error, :unauthorized}
end
end
Expand Down
12 changes: 10 additions & 2 deletions lib/logflare_web/controllers/log_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ defmodule LogflareWeb.LogController do
use LogflareWeb, :controller
use OpenApiSpex.ControllerSpecs

alias Logflare.Logs
alias Logflare.Logs.Processor
alias Opentelemetry.Proto.Collector.Trace.V1.ExportTraceServiceRequest

alias LogflareWeb.OpenApi.Created
alias LogflareWeb.OpenApi.ServerError
Expand All @@ -29,8 +31,6 @@ defmodule LogflareWeb.LogController do
when action in [:browser_reports, :generic_json, :create]
)

alias Logflare.Logs

@message "Logged!"

operation(:create,
Expand Down Expand Up @@ -224,4 +224,12 @@ defmodule LogflareWeb.LogController do
{String.replace(header, "-", "_"), data}
end
end

def otel_traces(
%{assigns: %{source: source}} = conn,
%ExportTraceServiceRequest{resource_spans: spans}
) do
Processor.ingest(spans, Logs.OtelTrace, source)
|> handle(conn)
end
end
49 changes: 31 additions & 18 deletions lib/logflare_web/controllers/plugs/fetch_resource.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,9 @@ defmodule LogflareWeb.Plugs.FetchResource do
import Plug.Conn
alias Logflare.Sources
alias Logflare.Endpoints
alias Logflare.Utils
def init(_opts), do: nil

# ingest by source token
def call(%{assigns: %{resource_type: :source}, params: params} = conn, _opts)
when is_map_key(params, "source") or is_map_key(params, "collection") do
token = params["source"] || params["collection"]

source =
case is_uuid?(token) do
true ->
Sources.Cache.get_by_and_preload_rules(token: token)
|> Sources.refresh_source_metrics_for_ingest()

_ ->
nil
end

assign(conn, :source, source)
end

# ingest by source name
def call(
%{assigns: %{user: user, resource_type: :source}, params: params} =
Expand All @@ -48,6 +31,25 @@ defmodule LogflareWeb.Plugs.FetchResource do
assign(conn, :source, source)
end

# ingest by source token
def call(%{assigns: %{resource_type: :source}, params: params} = conn, _opts) do
token =
Utils.Map.get(params, :source) || Utils.Map.get(params, :collection) ||
get_source_from_headers(conn)

source =
case is_uuid?(token) do
true ->
Sources.Cache.get_by_and_preload_rules(token: token)
|> Sources.refresh_source_metrics_for_ingest()

_ ->
nil
end

assign(conn, :source, source)
end

def call(
%{
assigns: %{resource_type: :endpoint} = assigns,
Expand Down Expand Up @@ -95,4 +97,15 @@ defmodule LogflareWeb.Plugs.FetchResource do
_ -> false
end
end

defp is_uuid?(_), do: false

def get_source_from_headers(conn) do
(Plug.Conn.get_req_header(conn, "x-source") ||
Plug.Conn.get_req_header(conn, "x-collection"))
|> case do
[value] -> value
_ -> nil
end
end
end
Loading

0 comments on commit 32ea7e9

Please sign in to comment.