Skip to content

Commit

Permalink
Merge pull request #1582 from Logflare/staging
Browse files Browse the repository at this point in the history
Release v1.3.12
  • Loading branch information
chasers authored Jun 15, 2023
2 parents e513a50 + de311a7 commit d2dcf8a
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 32 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.11
1.3.12
5 changes: 3 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ config :logflare, Logflare.Mailer,

config :swoosh, local: true

# use the default querying connection pool
# see application.ex for pool settings
config :tesla,
adapter:
{Tesla.Adapter.Hackney, [pool: Client.BigQuery, max_connections: 200, recv_timeout: 60_000]}
adapter: {Tesla.Adapter.Finch, name: Logflare.FinchDefault, receive_timeout: 60_000}

config :number,
delimit: [
Expand Down
13 changes: 11 additions & 2 deletions lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule Logflare.Application do
{DynamicSupervisor, strategy: :one_for_one, name: Logflare.Backends.RecentLogsSup},
{Registry, name: Logflare.Backends.SourceRegistry, keys: :unique},
{Registry, name: Logflare.Backends.SourceDispatcher, keys: :duplicate}
]
] ++ common_children()
end

defp get_children(_) do
Expand Down Expand Up @@ -133,7 +133,7 @@ defmodule Logflare.Application do

# Startup tasks
{Task, fn -> startup_tasks() end}
] ++ conditional_children()
] ++ conditional_children() ++ common_children()
end

def conditional_children do
Expand All @@ -154,6 +154,15 @@ defmodule Logflare.Application do
:ok
end

defp common_children do
[
# Finch connection pools, using http2
{Finch, name: Logflare.FinchIngest, pools: %{:default => [protocol: :http2, count: 200]}},
{Finch, name: Logflare.FinchQuery, pools: %{:default => [protocol: :http2, count: 100]}},
{Finch, name: Logflare.FinchDefault, pools: %{:default => [protocol: :http2, count: 50]}}
]
end

def startup_tasks do
# if single tenant, insert enterprise user
Logger.info("Executing startup tasks")
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/ecto/bigquery/bq_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Logflare.BqRepo do
|> Map.merge(override)

result =
GenUtils.get_conn()
GenUtils.get_conn(:query)
|> Api.Jobs.bigquery_jobs_query(
project_id,
body: query_request
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/google/bigquery/bigquery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ defmodule Logflare.Google.BigQuery do
batch
)
when is_atom(source_id) do
conn = GenUtils.get_conn()
conn = GenUtils.get_conn(:ingest)
table_name = GenUtils.format_table_name(source_id)

body = %Model.TableDataInsertAllRequest{
Expand Down
30 changes: 28 additions & 2 deletions lib/logflare/google/bigquery/gen_utils/gen_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,16 @@ defmodule Logflare.Google.BigQuery.GenUtils do
String.replace(string, "-", "_")
end

def get_conn() do
case Goth.fetch(Logflare.Goth) do
@doc """
Dynamically builds a Tesla client connection. Switches adapter at runtime based on first arg.
Uses `Logflare.FinchDefault` by default
"""
@typep conn_type :: :ingest | :query | :default
@spec get_conn(conn_type()) :: Tesla.Env.client()
def get_conn(conn_type \\ :default) do
Goth.fetch(Logflare.Goth)
|> case do
{:ok, %Goth.Token{} = goth} ->
Connection.new(goth.token)

Expand All @@ -74,8 +82,26 @@ defmodule Logflare.Google.BigQuery.GenUtils do
# This is going to give us an unauthorized connection but we are handling it downstream.
Connection.new("")
end
# dynamically set tesla adapter
|> Map.update!(:adapter, fn _value -> build_tesla_adapter_call(conn_type) end)
end

# copy over runtime adapter building from Tesla.client/2
# https://github.com/elixir-tesla/tesla/blob/v1.7.0/lib/tesla/builder.ex#L206
defp build_tesla_adapter_call(:ingest) do
Tesla.client([], {Tesla.Adapter.Finch, name: Logflare.FinchIngest, receive_timeout: 15_000}).adapter
end

defp build_tesla_adapter_call(:query) do
Tesla.client(
[],
{Tesla.Adapter.Finch, name: Logflare.FinchQuery, receive_timeout: 60_000}
).adapter
end

# use adapter in config.exs
defp build_tesla_adapter_call(_), do: nil

@spec get_account_id(atom) :: String.t()
def get_account_id(source_id) when is_atom(source_id) do
%Logflare.Source{user_id: account_id} = Sources.get_by(token: source_id)
Expand Down
6 changes: 1 addition & 5 deletions lib/logflare_web/live/search_live/logs_search_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,7 @@ defmodule LogflareWeb.Source.SearchLV do
search_result.aggregates.rows
|> Enum.reverse()
|> Enum.map(fn la ->
Map.update!(
la,
"timestamp",
&BqSchemaHelpers.format_timestamp(&1, timezone)
)
Map.update!(la, "timestamp", &BqSchemaHelpers.format_timestamp(&1, timezone))
end)

aggs =
Expand Down
13 changes: 7 additions & 6 deletions lib/logflare_web/live/search_live/templates/logs_list.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
id="source-logs-search-list"
data-last-query-completed-at={@last_query_completed_at}
phx-hook="SourceLogsSearchList"
class="mt-4"
>
<%= if @loading do %>
<div id="logs-list" class="blurred list-unstyled console-text-list"></div>
Expand All @@ -13,12 +14,12 @@
<!-- TODO: TO BE DELETED WHEN UNDERLYING ISSUE IS FOUND -->
<li id={"log-event_#{log.id || log.body["timestamp"]}"}>
<% %{"timestamp" => timestamp, "event_message" => message} = log.body %>
<%= if @use_local_time do %>
<mark class="log-datestamp" data-timestamp={timestamp}><%= format_timestamp(timestamp, @user_local_timezone) %></mark>
<% else %>
<mark class="log-datestamp" data-timestamp={timestamp}><%= format_timestamp(timestamp) <> " UTC" %></mark>
<% end %>
<%= message %>
<mark class="log-datestamp" data-timestamp={timestamp}>
<%= if @use_local_time do
format_timestamp(timestamp, @user_local_timezone)
else
format_timestamp(timestamp) <> " UTC"
end %></mark>&nbsp;<%= message %>
<%= live_modal_show_link(component: LogflareWeb.Search.LogEventViewerComponent, modal_id: :log_event_viewer, title: "Log Event", phx_value_log_event_id: log.id, phx_value_log_event_timestamp: log.body["timestamp"]) do %>
<span>event body</span>
<% end %>
Expand Down
7 changes: 2 additions & 5 deletions lib/logflare_web/views/helpers/bq_schema_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,8 @@ defmodule LogflareWeb.Helpers.BqSchema do
metadata
|> Iteraptor.map(
fn
{_, [val]} ->
val

{_, val} ->
val
{_, [val]} -> val
{_, val} -> val
end,
yield: :all
)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ defmodule Logflare.Mixfile do

# Outbound Requests
{:castore, "~> 0.1.0"},
{:finch, "~> 0.13.0"},
{:finch, "~> 0.16.0"},
{:mint, "~> 1.0"},
# {:hackney, github: "benoitc/hackney", override: true},
{:httpoison, "~> 1.4"},
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"excoveralls": {:hex, :excoveralls, "0.16.1", "0bd42ed05c7d2f4d180331a20113ec537be509da31fed5c8f7047ce59ee5a7c5", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dae763468e2008cf7075a64cb1249c97cb4bc71e236c5c2b5e5cdf1cfa2bf138"},
"expo": {:hex, :expo, "0.4.1", "1c61d18a5df197dfda38861673d392e642649a9cef7694d2f97a587b2cfb319b", [:mix], [], "hexpm", "2ff7ba7a798c8c543c12550fa0e2cbc81b95d4974c65855d8d15ba7b37a1ce47"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"finch": {:hex, :finch, "0.13.0", "c881e5460ec563bf02d4f4584079e62201db676ed4c0ef3e59189331c4eddf7b", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "49957dcde10dcdc042a123a507a9c5ec5a803f53646d451db2f7dea696fba6cc"},
"finch": {:hex, :finch, "0.16.0", "40733f02c89f94a112518071c0a91fe86069560f5dbdb39f9150042f44dcfb1a", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f660174c4d519e5fec629016054d60edd822cdfe2b7270836739ac2f97735ec5"},
"floki": {:hex, :floki, "0.29.0", "b1710d8c93a2f860dc2d7adc390dd808dc2fb8f78ee562304457b75f4c640881", [:mix], [{:html_entities, "~> 0.5.0", [hex: :html_entities, repo: "hexpm", optional: false]}], "hexpm", "008585ce64b9f74c07d32958ec9866f4b8a124bf4da1e2941b28e41384edaaad"},
"flow": {:hex, :flow, "1.2.4", "1dd58918287eb286656008777cb32714b5123d3855956f29aa141ebae456922d", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "874adde96368e71870f3510b91e35bc31652291858c86c0e75359cbdd35eb211"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
Expand Down Expand Up @@ -77,7 +77,7 @@
"money": {:hex, :money, "1.12.2", "8d294c9c3805bfeeaeeffadb3d8c9dce1be5ab1236dc4e564728badcbb79510d", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.0 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "d9cabe0549f0d815870c3832b2164afe1dab3802c76ba9cda8fd6199bd6383a9"},
"nimble_options": {:hex, :nimble_options, "0.4.0", "c89babbab52221a24b8d1ff9e7d838be70f0d871be823165c94dd3418eea728f", [:mix], [], "hexpm", "e6701c1af326a11eea9634a3b1c62b475339ace9456c1a23ec3bc9a847bca02d"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"nimble_pool": {:hex, :nimble_pool, "0.2.6", "91f2f4c357da4c4a0a548286c84a3a28004f68f05609b4534526871a22053cde", [:mix], [], "hexpm", "1c715055095d3f2705c4e236c18b618420a35490da94149ff8b580a2144f653f"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
"nimble_strftime": {:hex, :nimble_strftime, "0.1.1", "b988184d1bd945bc139b2c27dd00a6c0774ec94f6b0b580083abd62d5d07818b", [:mix], [], "hexpm", "89e599c9b8b4d1203b7bb5c79eb51ef7c6a28fbc6228230b312f8b796310d755"},
"number": {:hex, :number, "1.0.3", "932c8a2d478a181c624138958ca88a78070332191b8061717270d939778c9857", [:mix], [{:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm", "dd397bbc096b2ca965a6a430126cc9cf7b9ef7421130def69bcf572232ca0f18"},
"oauth2": {:hex, :oauth2, "2.0.1", "70729503e05378697b958919bb2d65b002ba6b28c8112328063648a9348aaa3f", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "c64e20d4d105bcdbcbe03170fb530d0eddc3a3e6b135a87528a22c8aecf74c52"},
Expand Down
9 changes: 7 additions & 2 deletions test/logflare/logs/logs_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,27 @@ defmodule Logflare.LogsTest do
describe "full ingestion pipeline test" do
test "additive schema update from log event", %{source: source} do
GoogleApi.BigQuery.V2.Api.Tabledata
|> expect(:bigquery_tabledata_insert_all, fn _conn,
|> expect(:bigquery_tabledata_insert_all, fn conn,
_project_id,
_dataset_id,
_table_name,
opts ->
assert {Tesla.Adapter.Finch, :call, [[name: Logflare.FinchIngest, receive_timeout: _]]} =
conn.adapter

[%{json: json}] = opts[:body].rows
assert json["event_message"] == "testing 123"
{:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
end)

GoogleApi.BigQuery.V2.Api.Tables
|> expect(:bigquery_tables_patch, fn _conn,
|> expect(:bigquery_tables_patch, fn conn,
_project_id,
_dataset_id,
_table_name,
[body: body] ->
# use default config adapter
assert conn.adapter == nil
schema = body.schema
assert %_{name: "key", type: "STRING"} = TestUtils.get_bq_field_schema(schema, "key")
{:ok, %{}}
Expand Down
5 changes: 4 additions & 1 deletion test/logflare_web/controllers/endpoints_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ defmodule LogflareWeb.EndpointsControllerTest do
|> stub(:fetch, fn _mod -> {:ok, %Goth.Token{token: "auth-token"}} end)

GoogleApi.BigQuery.V2.Api.Jobs
|> stub(:bigquery_jobs_query, fn _conn, _proj_id, _opts ->
|> stub(:bigquery_jobs_query, fn conn, _proj_id, _opts ->
assert {Tesla.Adapter.Finch, :call, [[name: Logflare.FinchQuery, receive_timeout: _]]} =
conn.adapter

{:ok, TestUtils.gen_bq_response()}
end)

Expand Down
6 changes: 5 additions & 1 deletion test/logflare_web/live_views/logs_search_lv_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,11 @@ defmodule LogflareWeb.Source.SearchLVTest do

test "run a query", %{conn: conn, source: source} do
GoogleApi.BigQuery.V2.Api.Jobs
|> stub(:bigquery_jobs_query, fn _conn, _proj_id, opts ->
|> stub(:bigquery_jobs_query, fn conn, _proj_id, opts ->
# use separate connection pool
assert {Tesla.Adapter.Finch, :call, [[name: Logflare.FinchQuery, receive_timeout: _]]} =
conn.adapter

query = opts[:body].query |> String.downcase()

if query =~ "strpos(t0.event_message, ?" do
Expand Down

0 comments on commit d2dcf8a

Please sign in to comment.