Skip to content

Commit

Permalink
fix: purge only a portion of events in queue (#2136)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc authored Jul 10, 2024
1 parent 08ce873 commit 8c88258
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
7 changes: 5 additions & 2 deletions lib/logflare/backends/ingest_event_queue/queue_janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
@default_interval 1_000
@default_remainder 100
@default_max 50_000
@default_purge_ratio 0.1

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
Expand All @@ -28,7 +29,8 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
backend_id: bid,
interval: Keyword.get(opts, :interval, @default_interval),
remainder: Keyword.get(opts, :remainder, @default_remainder),
max: Keyword.get(opts, :max, @default_max)
max: Keyword.get(opts, :max, @default_max),
purge_ratio: Keyword.get(opts, :purge_ratio, @default_purge_ratio)
}

schedule(state.interval)
Expand All @@ -44,7 +46,8 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
all_size = IngestEventQueue.get_table_size(sid_bid)

if all_size > state.max do
IngestEventQueue.truncate(sid_bid, :all, 0)
remainder = round((1 - state.purge_ratio) * all_size)
IngestEventQueue.truncate(sid_bid, :all, remainder)

Logger.warning(
"IngestEventQueue private :ets buffer exceeded max for source id=#{state.source_id}, dropping #{all_size} events",
Expand Down
25 changes: 23 additions & 2 deletions test/logflare/backends/ingest_events_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,40 @@ defmodule Logflare.Backends.IngestEventQueueTest do
assert IngestEventQueue.count_pending({source, backend}) == 0
end

test "QueueJanitor drops all if exceeds max" do
test "QueueJanitor purges if exceeds max" do
user = insert(:user)
source = insert(:source, user: user)
backend = insert(:backend, user: user)
IngestEventQueue.upsert_tid({source, backend})
batch = for _ <- 1..105, do: build(:log_event, source: source)
IngestEventQueue.add_to_table({source, backend}, batch)
assert IngestEventQueue.get_table_size({source, backend}) == 105
start_supervised!({QueueJanitor, source: source, backend: backend, interval: 100, max: 100})

start_supervised!(
{QueueJanitor, source: source, backend: backend, interval: 100, max: 100, purge_ratio: 1.0}
)

:timer.sleep(500)
assert IngestEventQueue.get_table_size({source, backend}) == 0
end

test "QueueJanitor purges based on purge ratio" do
user = insert(:user)
source = insert(:source, user: user)
backend = insert(:backend, user: user)
IngestEventQueue.upsert_tid({source, backend})
batch = for _ <- 1..100, do: build(:log_event, source: source)
IngestEventQueue.add_to_table({source, backend}, batch)
assert IngestEventQueue.get_table_size({source, backend}) == 100

start_supervised!(
{QueueJanitor, source: source, backend: backend, interval: 100, max: 90, purge_ratio: 0.5}
)

:timer.sleep(500)
assert IngestEventQueue.get_table_size({source, backend}) == 50
end

test "MapperJanitor cleans up stale tids" do
user = insert(:user)
source = insert(:source, user: user)
Expand Down

0 comments on commit 8c88258

Please sign in to comment.