Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several things I find useful #117

Merged
merged 6 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 48 additions & 64 deletions lib/poolex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
"""
@type run_option() :: {:checkout_timeout, timeout()}

@spawn_opts [priority: :high]

@doc """
Starts a Poolex process without links (outside of a supervision tree).

Expand All @@ -93,14 +95,14 @@
## Examples

iex> Poolex.start(pool_id: :my_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> %Poolex.Private.State{worker_module: worker_module} = Poolex.get_state(:my_pool)
iex> %Poolex.Private.State{worker_module: worker_module} = :sys.get_state(:my_pool)
iex> worker_module
Agent
"""
@spec start(list(poolex_option())) :: GenServer.on_start()
def start(opts) do
pool_id = Keyword.fetch!(opts, :pool_id)
GenServer.start(__MODULE__, opts, name: pool_id)
GenServer.start(__MODULE__, opts, name: pool_id, spawn_opt: @spawn_opts)
end

@doc """
Expand All @@ -117,14 +119,14 @@
## Examples

iex> Poolex.start_link(pool_id: :other_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> %Poolex.Private.State{worker_module: worker_module} = Poolex.get_state(:other_pool)
iex> %Poolex.Private.State{worker_module: worker_module} = :sys.get_state(:other_pool)
iex> worker_module
Agent
"""
@spec start_link(list(poolex_option())) :: GenServer.on_start()
def start_link(opts) do
pool_id = Keyword.fetch!(opts, :pool_id)
GenServer.start_link(__MODULE__, opts, name: pool_id)
GenServer.start_link(__MODULE__, opts, name: pool_id, spawn_opt: @spawn_opts)
end

@doc """
Expand Down Expand Up @@ -202,23 +204,10 @@
end
end

@doc """
Returns current state of started pool.

Primarily needed to help with debugging. **Avoid using this function in production.**

## Examples

iex> Poolex.start(pool_id: :my_pool_2, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> state = %Poolex.Private.State{} = Poolex.get_state(:my_pool_2)
iex> state.worker_module
Agent
iex> is_pid(state.supervisor)
true
"""
@spec get_state(pool_id()) :: State.t()
def get_state(pool_id) do
GenServer.call(pool_id, :get_state)
@deprecated "Use :sys.get_state/1 instead"
@doc false
def get_state(name) do
:sys.get_state(name)
end

@doc """
Expand Down Expand Up @@ -300,21 +289,19 @@
waiting_callers_impl =
Keyword.get(opts, :waiting_callers_impl, Poolex.Callers.Impl.ErlangQueue)

{:ok, monitor_pid} = Monitoring.init()
{:ok, supervisor} = Poolex.Private.Supervisor.start_link()

state =
%State{
max_overflow: max_overflow,
monitor_pid: monitor_pid,
pool_id: pool_id,
supervisor: supervisor,
worker_args: worker_args,
worker_module: worker_module,
worker_start_fun: worker_start_fun
}

initial_workers_pids = start_workers(workers_count, state)
{initial_workers_pids, state} = start_workers(workers_count, state)

state =
state
Expand All @@ -332,25 +319,24 @@
{:noreply, state}
end

@spec start_workers(non_neg_integer(), State.t()) :: [pid]
defp start_workers(0, _state) do
[]
@spec start_workers(non_neg_integer(), State.t()) :: {[pid], State.t()}
defp start_workers(0, state) do
{[], state}
end

defp start_workers(workers_count, _state) when workers_count < 0 do
msg = "workers_count must be non negative number, received: #{inspect(workers_count)}"
raise ArgumentError, msg
end

defp start_workers(workers_count, state) do
Enum.map(1..workers_count, fn _ ->
defp start_workers(workers_count, state) when is_integer(workers_count) and workers_count >= 1 do
Enum.map_reduce(1..workers_count, state, fn _iterator, state ->
{:ok, worker_pid} = start_worker(state)
Monitoring.add(state.monitor_pid, worker_pid, :worker)

worker_pid
state = Monitoring.add(state, worker_pid, :worker)
{worker_pid, state}
end)
end

defp start_workers(workers_count, _state) do
msg = "workers_count must be non negative integer, received: #{inspect(workers_count)}"
raise ArgumentError, msg
end

@spec start_worker(State.t()) :: {:ok, pid()}
defp start_worker(%State{} = state) do
DynamicSupervisor.start_child(state.supervisor, %{
Expand All @@ -371,16 +357,17 @@
if state.overflow < state.max_overflow do
{:ok, new_worker} = start_worker(state)

Monitoring.add(state.monitor_pid, new_worker, :worker)

state = BusyWorkers.add(state, new_worker)
state =
state
|> Monitoring.add(new_worker, :worker)
|> BusyWorkers.add(new_worker)

{:reply, {:ok, new_worker}, %State{state | overflow: state.overflow + 1}}
else
Monitoring.add(state.monitor_pid, from_pid, :waiting_caller)

state =
WaitingCallers.add(state, %Poolex.Caller{reference: caller_reference, from: caller})
state =
state
|> Monitoring.add(from_pid, :waiting_caller)
|> WaitingCallers.add(%Poolex.Caller{reference: caller_reference, from: caller})

{:noreply, state}
end
Expand All @@ -392,10 +379,6 @@
end
end

def handle_call(:get_state, _from, state) do
{:reply, state, state}
end

def handle_call(:get_debug_info, _from, %State{} = state) do
debug_info = %DebugInfo{
busy_workers_count: BusyWorkers.count(state),
Expand All @@ -418,14 +401,14 @@

@impl GenServer
def handle_call({:add_idle_workers, workers_count}, _from, %State{} = state) do
new_state =
workers_count
|> start_workers(state)
|> Enum.reduce(state, fn worker, acc_state ->
{workers, state} = start_workers(workers_count, state)

state =
Enum.reduce(workers, state, fn worker, acc_state ->
IdleWorkers.add(acc_state, worker)
end)

{:reply, :ok, new_state}
{:reply, :ok, state}
end

@impl GenServer
Expand Down Expand Up @@ -459,11 +442,11 @@

@impl GenServer
def handle_info({:DOWN, monitoring_reference, _process, dead_process_pid, _reason}, %State{} = state) do
case Monitoring.remove(state.monitor_pid, monitoring_reference) do
:worker ->
case Monitoring.remove(state, monitoring_reference) do
{:worker, state} ->
{:noreply, handle_down_worker(state, dead_process_pid)}

:waiting_caller ->
{:waiting_caller, state} ->
{:noreply, handle_down_waiting_caller(state, dead_process_pid)}
end
end
Expand Down Expand Up @@ -507,16 +490,18 @@
else
{:ok, new_worker} = start_worker(state)

Monitoring.add(state.monitor_pid, new_worker, :worker)

IdleWorkers.add(state, new_worker)
state =

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 24 / Elixir 1.11

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 22 / Elixir 1.9

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 22 / Elixir 1.8

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 25 / Elixir 1.14

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 24 / Elixir 1.12

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 26 / Elixir 1.15

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 23 / Elixir 1.10

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 493 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 25 / Elixir 1.13

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)
state
|> Monitoring.add(new_worker, :worker)
|> IdleWorkers.add(new_worker)
end
else
{:ok, new_worker} = start_worker(state)
Monitoring.add(state.monitor_pid, new_worker, :worker)

state
|> BusyWorkers.add(new_worker)

state =

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 24 / Elixir 1.11

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 22 / Elixir 1.9

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 22 / Elixir 1.8

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 25 / Elixir 1.14

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 24 / Elixir 1.12

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 26 / Elixir 1.15

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 23 / Elixir 1.10

variable "state" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 501 in lib/poolex.ex

View workflow job for this annotation

GitHub Actions / OTP 25 / Elixir 1.13

variable "state" is unused (there is a variable with the same name in the context, use the pin operator (^) to match on it or prefix this variable with underscore if it is not meant to be used)
state
|> Monitoring.add(new_worker, :worker)
|> BusyWorkers.add(new_worker)
|> provide_worker_to_waiting_caller(new_worker)
end
end
Expand All @@ -529,7 +514,6 @@
@impl GenServer
def terminate(reason, %State{} = state) do
DynamicSupervisor.stop(state.supervisor, reason)
Monitoring.stop(state.monitor_pid)

:ok
end
Expand Down
36 changes: 10 additions & 26 deletions lib/poolex/private/monitoring.ex
Original file line number Diff line number Diff line change
@@ -1,42 +1,26 @@
defmodule Poolex.Private.Monitoring do
@moduledoc false
@type kind_of_process() :: :worker | :waiting_caller

@spec init() :: {:ok, pid()}
@doc """
Create new monitoring references storage.
"""
def init do
Agent.start_link(fn -> %{} end)
end
alias Poolex.Private.State

@spec stop(pid()) :: :ok
@doc """
Delete storage.
"""
def stop(pid) do
Agent.stop(pid)
end
@type kind_of_process() :: :worker | :waiting_caller

@spec add(monitor_pid :: pid(), worker_pid :: pid(), kind_of_process()) :: :ok
@spec add(State.t(), worker_pid :: pid(), kind_of_process()) :: State.t()
@doc """
Start monitoring given worker or caller process.
"""
def add(monitor_pid, process_pid, kind_of_process) do
def add(%{monitors: monitors} = state, process_pid, kind_of_process) do
reference = Process.monitor(process_pid)

Agent.update(monitor_pid, fn state -> Map.put(state, reference, kind_of_process) end)
%{state | monitors: Map.put(monitors, reference, kind_of_process)}
end

@spec remove(monitor_pid :: pid(), reference()) :: kind_of_process()
@spec remove(State.t(), reference()) :: {kind_of_process(), State.t()}
@doc """
Stop monitoring given worker or caller process and return kind of it.
"""
def remove(monitor_pid, monitoring_reference) do
def remove(%{monitors: monitors} = state, monitoring_reference) do
true = Process.demonitor(monitoring_reference)

Agent.get_and_update(monitor_pid, fn state ->
{Map.get(state, monitoring_reference), Map.delete(state, monitoring_reference)}
end)
kind_of_process = Map.get(monitors, monitoring_reference)
state = %{state | monitors: Map.delete(monitors, monitoring_reference)}
{kind_of_process, state}
end
end
42 changes: 24 additions & 18 deletions lib/poolex/private/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,39 @@ defmodule Poolex.Private.State do
Can be used for debugging.
"""

defstruct busy_workers_impl: nil,
busy_workers_state: nil,
idle_workers_impl: nil,
idle_workers_state: nil,
max_overflow: 0,
monitor_pid: nil,
overflow: 0,
pool_id: nil,
supervisor: nil,
waiting_callers_impl: nil,
waiting_callers_state: nil,
worker_args: [],
worker_module: nil,
worker_start_fun: nil
@enforce_keys [
:max_overflow,
:pool_id,
:supervisor,
:worker_args,
:worker_module,
:worker_start_fun
]

defstruct @enforce_keys ++
[
busy_workers_impl: nil,
busy_workers_state: nil,
idle_workers_impl: nil,
idle_workers_state: nil,
monitors: %{},
overflow: 0,
waiting_callers_impl: nil,
waiting_callers_state: nil
]

@type t() :: %__MODULE__{
busy_workers_impl: module(),
busy_workers_state: Poolex.Workers.Behaviour.state(),
busy_workers_state: nil | Poolex.Workers.Behaviour.state(),
idle_workers_impl: module(),
idle_workers_state: Poolex.Workers.Behaviour.state(),
idle_workers_state: nil | Poolex.Workers.Behaviour.state(),
max_overflow: non_neg_integer(),
monitor_pid: pid(),
monitors: %{reference() => Poolex.Private.Monitoring.kind_of_process()},
overflow: non_neg_integer(),
pool_id: Poolex.pool_id(),
supervisor: pid(),
waiting_callers_impl: module(),
waiting_callers_state: Poolex.Callers.Behaviour.state(),
waiting_callers_state: nil | Poolex.Callers.Behaviour.state(),
worker_args: list(any()),
worker_module: module(),
worker_start_fun: atom()
Expand Down
8 changes: 4 additions & 4 deletions test/poolex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ defmodule PoolexTest do
pool_name = start_pool(worker_module: SomeWorker, workers_count: 1)
pool_pid = Process.whereis(pool_name)

state = Poolex.get_state(pool_name)
state = :sys.get_state(pool_name)

supervisor_pid = state.supervisor
{:ok, worker_pid} = Poolex.run(pool_name, fn pid -> pid end)
Expand All @@ -559,7 +559,7 @@ defmodule PoolexTest do
pool_name = start_pool(worker_module: SomeWorker, workers_count: 1)
pool_pid = Process.whereis(pool_name)

state = Poolex.get_state(pool_name)
state = :sys.get_state(pool_name)

supervisor_pid = state.supervisor
{:ok, worker_pid} = Poolex.run(pool_name, fn pid -> pid end)
Expand Down Expand Up @@ -660,7 +660,7 @@ defmodule PoolexTest do
]}
)

state = Poolex.get_state({:global, :biba})
state = :sys.get_state({:global, :biba})

assert state.pool_id == {:global, :biba}

Expand All @@ -673,7 +673,7 @@ defmodule PoolexTest do

ExUnit.Callbacks.start_supervised({Poolex, [pool_id: name, worker_module: SomeWorker, workers_count: 5]})

state = Poolex.get_state(name)
state = :sys.get_state(name)

assert state.pool_id == name

Expand Down
Loading