Skip to content

Commit

Permalink
fix: remove Endpoint not up error logging, switch to error tuple (#2108)
Browse files Browse the repository at this point in the history
* fix: remove Endpoint not up error logging, switch to error tuple

* chore: fix flaky test
  • Loading branch information
Ziinc authored Jun 18, 2024
1 parent 967399b commit 8e3ee7f
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 25 deletions.
2 changes: 1 addition & 1 deletion lib/logflare/backends/buffer_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ defmodule Logflare.Backends.BufferProducer do
backend_token: state.backend_token
}

Source.ChannelTopics.broadcast_buffer(payload)
Source.ChannelTopics.local_broadcast_buffer(payload)

# broadcasts local buffer map to entire cluster, local included
len = GenStage.estimate_buffered_count(pid)
Expand Down
28 changes: 19 additions & 9 deletions lib/logflare/source/channel_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,38 @@ defmodule Logflare.Source.ChannelTopics do
LogflareWeb.Endpoint.subscribe("source:#{source_token}")
end

def broadcast_log_count(%{log_count: log_count, source_token: source_token} = payload) do
def local_broadcast_log_count(%{log_count: log_count, source_token: source_token} = payload) do
payload = %{payload | log_count: Delimit.number_to_delimited(log_count)}
topic = "dashboard:#{source_token}"
event = "log_count"

logflare_local_broadcast(topic, event, payload)
maybe_local_broadcast(topic, event, payload)
end

def broadcast_buffer(%{buffer: _buffer, source_token: source_token} = payload) do
@doc """
Broadcasts the channel buffer locally
"""
def local_broadcast_buffer(%{buffer: _buffer, source_token: source_token} = payload) do
topic = "dashboard:#{source_token}"
event = "buffer"

logflare_local_broadcast(topic, event, payload)
maybe_local_broadcast(topic, event, payload)
end

def broadcast_rates(payload) do
def local_broadcast_rates(payload) do
payload =
payload
|> Map.put(:rate, payload[:last_rate])

topic = "dashboard:#{payload.source_token}"
event = "rate"

logflare_local_broadcast(topic, event, payload)
maybe_local_broadcast(topic, event, payload)
end

@doc """
Broadcasts events to all nodes
"""
def broadcast_new(events) when is_list(events), do: Enum.map(events, &broadcast_new/1)

def broadcast_new(%LE{source: %Source{token: token}, body: body} = le) do
Expand All @@ -63,10 +69,12 @@ defmodule Logflare.Source.ChannelTopics do
})
end

# performs a global broadcast
@spec maybe_broadcast(String.t(), String.t(), map()) :: :ok | {:error, :endpoint_not_up}
def maybe_broadcast(topic, event, payload) do
case :ets.whereis(LogflareWeb.Endpoint) do
:undefined ->
Logger.error("Endpoint not up yet!")
{:error, :endpoint_not_up}

_ ->
LogflareWeb.Endpoint.broadcast(
Expand All @@ -77,10 +85,12 @@ defmodule Logflare.Source.ChannelTopics do
end
end

def logflare_local_broadcast(topic, event, payload) do
# performs a local broadcast
@spec maybe_local_broadcast(String.t(), String.t(), map()) :: :ok | {:error, :endpoint_not_up}
def maybe_local_broadcast(topic, event, payload) do
case :ets.whereis(LogflareWeb.Endpoint) do
:undefined ->
Logger.error("Endpoint not up yet!")
{:error, :endpoint_not_up}

_ ->
LogflareWeb.Endpoint.local_broadcast(topic, event, payload)
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/source/rate_counter_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ defmodule Logflare.Source.RateCounterServer do
PubSubRates.Cache.get_cluster_rates(state.source_id)
|> Map.put(:source_token, state.source_id)

Source.ChannelTopics.broadcast_rates(cluster_rates)
Source.ChannelTopics.local_broadcast_rates(cluster_rates)
end

@spec get_insert_count(atom) :: {:ok, non_neg_integer()}
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/source/recent_logs_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ defmodule Logflare.Source.RecentLogsServer do

if current_cluster_inserts > last_cluster_inserts do
payload = %{log_count: current_cluster_inserts, source_token: state.source_token}
Source.ChannelTopics.broadcast_log_count(payload)
Source.ChannelTopics.local_broadcast_log_count(payload)
end

{:ok, current_cluster_inserts, current_inserts}
Expand Down
7 changes: 3 additions & 4 deletions test/logflare/backends_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,9 @@ defmodule Logflare.BackendsTest do
backend = insert(:backend, user: user)
[source1, source2] = insert_pair(:source, user: user, rules: [])

rules =
for _ <- 1..250 do
insert(:rule, source: source2, backend: backend, lql_string: "message")
end
for _ <- 1..250 do
insert(:rule, source: source2, backend: backend, lql_string: "message")
end

source2 = Sources.preload_defaults(source2)

Expand Down
6 changes: 3 additions & 3 deletions test/logflare/cluster_pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ defmodule Logflare.ClusterPubSubTest do

test "broadcast to dashboard", %{source: %{token: source_token}} do
ChannelTopics.subscribe_dashboard(source_token)
ChannelTopics.broadcast_log_count(%{log_count: 1111, source_token: source_token})
ChannelTopics.broadcast_rates(%{last_rate: 2222, source_token: source_token})
ChannelTopics.broadcast_buffer(%{buffer: 3333, source_token: source_token})
ChannelTopics.local_broadcast_log_count(%{log_count: 1111, source_token: source_token})
ChannelTopics.local_broadcast_rates(%{last_rate: 2222, source_token: source_token})
ChannelTopics.local_broadcast_buffer(%{buffer: 3333, source_token: source_token})

:timer.sleep(500)
assert_received %_{event: "log_count", payload: %{log_count: "1,111"}}
Expand Down
13 changes: 7 additions & 6 deletions test/logflare_web/controllers/health_check_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ defmodule LogflareWeb.HealthCheckControllerTest do
alias Logflare.SingleTenant
alias Logflare.Source

test "normal node health check", %{conn: conn} do
start_supervised!(Source.Supervisor)
setup do
Logflare.Google.BigQuery
|> stub(:init_table!, fn _, _, _, _, _, _ -> :ok end)

assert %{"status" => "coming_up"} =
conn
|> get("/health")
|> json_response(503)
:ok
end

test "normal node health check", %{conn: conn} do
start_supervised!(Source.Supervisor)
:timer.sleep(1000)

conn = get(conn, "/health")
Expand Down

0 comments on commit 8e3ee7f

Please sign in to comment.