Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cross-node cache warming #1875

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion lib/logflare/billing/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ defmodule Logflare.Billing.Cache do
alias Logflare.Billing

require Logger
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def get_billing_account_by(keyword) do
Expand Down
71 changes: 71 additions & 0 deletions lib/logflare/cluster/cache_warmer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
defmodule Logflare.Cluster.CacheWarmer do
@moduledoc """
Performs cross-node cache warming, by retrieving all cache data from the other node and setting it on the cache.


"""
use Cachex.Warmer
import Cachex.Spec
@agent __MODULE__.State

def handle_info({:batch, cache, pairs}, state) do
Cachex.import(cache, pairs)
{:noreply, state}
end

# only on startup
@impl Cachex.Warmer
def interval, do: :timer.hours(24 * 365)
@impl Cachex.Warmer
def execute(cache) do
# starts the agent if not yet started
# if already started, will return error tuple, but does not affect subsequent Agent.get
# use agent to store state as GenServer state is managed by Cachex.
Agent.start_link(fn -> %{node: nil} end, name: @agent)
prev_node = Agent.get(@agent, & &1.node)

target =
if prev_node != nil do
prev_node
else
Node.list()
|> Enum.map(fn node ->
case :rpc.call(node, Cachex, :count, [cache]) do
{:ok, count} when count > 0 -> node
_ -> nil
end
end)
|> Enum.filter(&(&1 != nil))
|> Enum.sort_by(&Atom.to_string/1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC Atom.to_string call is not needed, as it is default behaviour of comparing atoms.

|> List.first()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting and then fetching first is the same as finding minimum.

Comment on lines +38 to +40
Copy link
Contributor

@hauleth hauleth Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|> Enum.filter(&(&1 != nil))
|> Enum.sort_by(&Atom.to_string/1)
|> List.first()
|> Enum.filter(&(&1 != nil))
|> Enum.min(&<=/2, fn -> nil end)

end

if target do
pid = self()

# don't block the caller, using :rpc.async_call results in crashloop due to key message handling.
Task.start(fn ->
:rpc.call(target, __MODULE__, :stream_to_node, [pid, cache])
end)
end

{:ok, []}
end

# stream entries to the provided target node
def stream_to_node(pid, cache) do
# send message to CacheWarmer process on that node
Cachex.stream!(cache)
|> Stream.chunk_every(250)
|> Stream.each(fn chunk ->
send(pid, {:batch, cache, chunk})
end)
|> Stream.run()

:ok
end

def warmer_spec(mod) do
warmer(module: __MODULE__, state: mod)
end
end
18 changes: 16 additions & 2 deletions lib/logflare/context_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,26 @@ defmodule Logflare.ContextCache do
"""

require Logger

alias Logflare.Cluster.CacheWarmer
@cache __MODULE__

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [@cache, [stats: stats]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
@cache,
[
stats: stats,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

@spec apply_fun(atom(), tuple(), [list()]) :: any()
Expand Down
18 changes: 17 additions & 1 deletion lib/logflare/partners/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ defmodule Logflare.Partners.Cache do
"""

alias Logflare.Partners
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def get_partner(id), do: apply_repo_fun(__ENV__.function, [id])
Expand Down
17 changes: 16 additions & 1 deletion lib/logflare/pubsub_rates/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@ defmodule Logflare.PubSubRates.Cache do

alias Logflare.Source
alias Logflare.Cluster
alias Logflare.Cluster.CacheWarmer

@cache __MODULE__
@default_bucket_width 60

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [@cache, [stats: stats]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
@cache,
[
stats: stats,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def cache_rates(source_id, rates) when is_atom(source_id) do
Expand Down
18 changes: 17 additions & 1 deletion lib/logflare/sources/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,26 @@ defmodule Logflare.Sources.Cache do
@moduledoc false

alias Logflare.Sources
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

# For ingest
Expand Down
18 changes: 17 additions & 1 deletion lib/logflare/users/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ defmodule Logflare.Users.Cache do
"""

alias Logflare.Users
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def get(id), do: apply_repo_fun(__ENV__.function, [id])
Expand Down
78 changes: 56 additions & 22 deletions test/logflare/context_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,65 @@ defmodule Logflare.ContextCacheTest do
alias Logflare.ContextCache
alias Logflare.Sources

setup do
user = insert(:user)
insert(:plan, name: "Free")
source = insert(:source, user: user)
args = [token: source.token]
source = Sources.Cache.get_by(args)
fun = :get_by
cache_key = {fun, [args]}
%{source: source, cache_key: cache_key}
end
describe "warming" do
setup do
Cachex.clear(Users.Cache)
:ok
end
test "retrieve cache data from different node" do
user = insert(:user)
[node1] = LocalCluster.start_nodes(:initial, 1, [ files: [ __ENV__.file ] ])

test "cache_name/1" do
assert Sources.Cache == ContextCache.cache_name(Sources)
end
# fetch from cache
Node.spawn(node1, fn ->
Cachex.clear(Logflare.Users.Cache)
Logflare.Users.Cache.get(user.id)
end)
:timer.sleep(500)

reject(&Logflare.Repo.get/2)
[node2] = LocalCluster.start_nodes(:new, 1, [ files: [ __ENV__.file ] ])
# cache warming should happen automatically on cache startup
pid = self()
:timer.sleep(500)
Node.spawn(node2, fn ->
{:ok, count} = Cachex.count(Logflare.Users.Cache)
send(pid, {:count, count})
end)
:timer.sleep(500)
assert_receive {:count, 1}
end

test "apply_fun/3", %{cache_key: cache_key} do
# apply_fun was called in the setup when we called `Sources.Cache.get_by/1`
# here's we're making sure it did get cached correctly
assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key)
end

test "bust_keys/1", %{source: source, cache_key: cache_key} do
assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}])
assert is_nil(Cachex.get!(Sources.Cache, cache_key))
match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"}
assert [] = :ets.match(ContextCache, match)
describe "functionality" do
setup do
user = insert(:user)
insert(:plan, name: "Free")
source = insert(:source, user: user)
args = [token: source.token]
source = Sources.Cache.get_by(args)
fun = :get_by
cache_key = {fun, [args]}
%{source: source, cache_key: cache_key}
end

test "cache_name/1" do
assert Sources.Cache == ContextCache.cache_name(Sources)
end

test "apply_fun/3", %{cache_key: cache_key} do
# apply_fun was called in the setup when we called `Sources.Cache.get_by/1`
# here's we're making sure it did get cached correctly
assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key)
end

test "bust_keys/1", %{source: source, cache_key: cache_key} do
assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}])
assert is_nil(Cachex.get!(Sources.Cache, cache_key))
match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"}
assert [] = :ets.match(ContextCache, match)
end

end
end
1 change: 1 addition & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Mimic.copy(Logflare.Logs)
Mimic.copy(Logflare.Logs.LogEvents)
Mimic.copy(Logflare.Logs.SearchQueryExecutor)
Mimic.copy(Logflare.Lql)
Mimic.copy(Logflare.Repo)
Mimic.copy(Logflare.Users)
Mimic.copy(Logflare.Sources)
Mimic.copy(Logflare.Billing)
Expand Down