Skip to content

Commit

Permalink
feat: cross-node cache warming
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc committed Dec 4, 2023
1 parent 0ca0c77 commit b19575a
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 29 deletions.
18 changes: 17 additions & 1 deletion lib/logflare/billing/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ defmodule Logflare.Billing.Cache do
alias Logflare.Billing

require Logger
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def get_billing_account_by(keyword) do
Expand Down
69 changes: 69 additions & 0 deletions lib/logflare/cluster/cache_warmer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
defmodule Logflare.Cluster.CacheWarmer do
@moduledoc """
Performs cross-node cache warming, by retrieving all cache data from the other node and setting it on the cache.
"""
use Cachex.Warmer
import Cachex.Spec

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(args) do
opts = Enum.into(args, %{cache: nil})
{:ok, opts}
end

def handle_info({:batch, cache, pairs}, state) do
Cachex.import(cache, pairs)
{:noreply, state}
end

# only on startup
@impl Cachex.Warmer
def interval, do: :timer.hours(24 * 365)
@impl Cachex.Warmer
def execute(cache) do
# stream all data from the cache
target =
Node.list()
|> Enum.map(fn node ->
case :rpc.call(node, Cachex, :count, [cache]) do
{:ok, count} when count > 0 -> node
_ -> nil
end
end)
|> Enum.filter(&(&1 != nil))
|> List.first()

if target do
pid = self()

# don't block the caller, using :rpc.async_call results in crashloop due to key message handling.
Task.start(fn ->
:rpc.call(target, __MODULE__, :stream_to_node, [pid, cache])
end)
end

{:ok, []}
end

# stream entries to the provided target node
def stream_to_node(pid, cache) do
# send message to CacheWarmer process on that node
Cachex.stream!(cache)
|> Stream.chunk_every(250)
|> Stream.each(fn chunk ->
send(pid, {:batch, cache, chunk})
end)
|> Stream.run()

:ok
end

def warmer_spec(mod) do
warmer(module: __MODULE__, state: mod)
end
end
18 changes: 16 additions & 2 deletions lib/logflare/context_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,26 @@ defmodule Logflare.ContextCache do
"""

require Logger

alias Logflare.Cluster.CacheWarmer
@cache __MODULE__

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [@cache, [stats: stats]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
@cache,
[
stats: stats,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

@spec apply_fun(atom(), tuple(), [list()]) :: any()
Expand Down
18 changes: 17 additions & 1 deletion lib/logflare/partners/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ defmodule Logflare.Partners.Cache do
"""

alias Logflare.Partners
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def get_partner(id), do: apply_repo_fun(__ENV__.function, [id])
Expand Down
17 changes: 16 additions & 1 deletion lib/logflare/pubsub_rates/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@ defmodule Logflare.PubSubRates.Cache do

alias Logflare.Source
alias Logflare.Cluster
alias Logflare.Cluster.CacheWarmer

@cache __MODULE__
@default_bucket_width 60

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [@cache, [stats: stats]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
@cache,
[
stats: stats,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def cache_rates(source_id, rates) when is_atom(source_id) do
Expand Down
18 changes: 17 additions & 1 deletion lib/logflare/sources/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,26 @@ defmodule Logflare.Sources.Cache do
@moduledoc false

alias Logflare.Sources
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

# For ingest
Expand Down
18 changes: 17 additions & 1 deletion lib/logflare/users/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ defmodule Logflare.Users.Cache do
"""

alias Logflare.Users
alias Logflare.Cluster.CacheWarmer

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: __MODULE__, start: {Cachex, :start_link, [__MODULE__, [stats: stats, limit: 100_000]]}}

%{
id: __MODULE__,
start:
{Cachex, :start_link,
[
__MODULE__,
[
stats: stats,
limit: 100_000,
warmers: [
CacheWarmer.warmer_spec(__MODULE__)
]
]
]}
}
end

def get(id), do: apply_repo_fun(__ENV__.function, [id])
Expand Down
79 changes: 57 additions & 22 deletions test/logflare/context_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,67 @@ defmodule Logflare.ContextCacheTest do

alias Logflare.ContextCache
alias Logflare.Sources
alias Logflare.Cluster.CacheWarmer

setup do
user = insert(:user)
insert(:plan, name: "Free")
source = insert(:source, user: user)
args = [token: source.token]
source = Sources.Cache.get_by(args)
fun = :get_by
cache_key = {fun, [args]}
%{source: source, cache_key: cache_key}
end
describe "warming" do
setup do
start_supervised!(CacheWarmer)
:ok
end
test "retrieve cache data from different node" do
user = insert(:user)
[node1] = LocalCluster.start_nodes(:initial, 1,[ files: [ __ENV__.file ] ])

test "cache_name/1" do
assert Sources.Cache == ContextCache.cache_name(Sources)
end
# fetch from cache
Node.spawn(node1, fn ->
Logflare.Users.Cache.get(user.id)
end)
:timer.sleep(250)

reject(&Logflare.Repo.get/2)
[node2] = LocalCluster.start_nodes(:new, 1,[ files: [ __ENV__.file ] ])
# manually trigger cache warming
pid = self()
:timer.sleep(500)
Node.spawn(node2, fn ->
# dbg(Cachex)
{:ok, count} = Cachex.count(Logflare.Users.Cache)
send(pid, {:count, count})
end)
:timer.sleep(500)
assert_receive {:count, 1}
end

test "apply_fun/3", %{cache_key: cache_key} do
# apply_fun was called in the setup when we called `Sources.Cache.get_by/1`
# here's we're making sure it did get cached correctly
assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key)
end

test "bust_keys/1", %{source: source, cache_key: cache_key} do
assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}])
assert is_nil(Cachex.get!(Sources.Cache, cache_key))
match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"}
assert [] = :ets.match(ContextCache, match)
describe "functionality" do
setup do
user = insert(:user)
insert(:plan, name: "Free")
source = insert(:source, user: user)
args = [token: source.token]
source = Sources.Cache.get_by(args)
fun = :get_by
cache_key = {fun, [args]}
%{source: source, cache_key: cache_key}
end

test "cache_name/1" do
assert Sources.Cache == ContextCache.cache_name(Sources)
end

test "apply_fun/3", %{cache_key: cache_key} do
# apply_fun was called in the setup when we called `Sources.Cache.get_by/1`
# here's we're making sure it did get cached correctly
assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key)
end

test "bust_keys/1", %{source: source, cache_key: cache_key} do
assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}])
assert is_nil(Cachex.get!(Sources.Cache, cache_key))
match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"}
assert [] = :ets.match(ContextCache, match)
end

end
end
1 change: 1 addition & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Mimic.copy(Logflare.Logs)
Mimic.copy(Logflare.Logs.LogEvents)
Mimic.copy(Logflare.Logs.SearchQueryExecutor)
Mimic.copy(Logflare.Lql)
Mimic.copy(Logflare.Repo)
Mimic.copy(Logflare.Users)
Mimic.copy(Logflare.Sources)
Mimic.copy(Logflare.Billing)
Expand Down

0 comments on commit b19575a

Please sign in to comment.