Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement add_idle_workers!/2 #97

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added function `add_idle_workers!/2` for adding some idle workers to the initialized pool in runtime.

### Changed

- Refactored private `start_workers` function. It no longer accepts monitor_id as it already is in the state.

## [0.9.0] - 2024-04-24

### Added
Expand Down
41 changes: 34 additions & 7 deletions lib/poolex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,21 @@ defmodule Poolex do
GenServer.call(pool_id, :get_debug_info)
end

@doc """
Adds some idle workers to existing pool.
"""
@spec add_idle_workers!(pool_id(), pos_integer()) :: :ok | no_return()
def add_idle_workers!(_pool_id, workers_count) when workers_count < 1 do
message = "workers_count must be positive number, received: #{inspect(workers_count)}"

raise ArgumentError, message
end

def add_idle_workers!(pool_id, workers_count)
when is_atom(pool_id) and is_integer(workers_count) do
GenServer.call(pool_id, {:add_idle_workers, workers_count})
end

@impl GenServer
def init(opts) do
Process.flag(:trap_exit, true)
Expand Down Expand Up @@ -285,7 +300,7 @@ defmodule Poolex do
worker_start_fun: worker_start_fun
}

initial_workers_pids = start_workers(workers_count, state, monitor_id)
initial_workers_pids = start_workers(workers_count, state)

state =
state
Expand All @@ -303,20 +318,20 @@ defmodule Poolex do
{:noreply, state}
end

@spec start_workers(non_neg_integer(), State.t(), Monitoring.monitor_id()) :: [pid]
defp start_workers(0, _state, _monitor_id) do
@spec start_workers(non_neg_integer(), State.t()) :: [pid]
defp start_workers(0, _state) do
[]
end

defp start_workers(workers_count, _state, _monitor_id) when workers_count < 0 do
defp start_workers(workers_count, _state) when workers_count < 0 do
msg = "workers_count must be non negative number, received: #{inspect(workers_count)}"
raise ArgumentError, msg
end

defp start_workers(workers_count, state, monitor_id) do
defp start_workers(workers_count, state) do
Enum.map(1..workers_count, fn _ ->
{:ok, worker_pid} = start_worker(state)
Monitoring.add(monitor_id, worker_pid, :worker)
Monitoring.add(state.monitor_id, worker_pid, :worker)

worker_pid
end)
Expand Down Expand Up @@ -367,7 +382,7 @@ defmodule Poolex do
{:reply, state, state}
end

def handle_call(:get_debug_info, _form, %State{} = state) do
def handle_call(:get_debug_info, _from, %State{} = state) do
debug_info = %DebugInfo{
busy_workers_count: BusyWorkers.count(state),
busy_workers_impl: state.busy_workers_impl,
Expand All @@ -387,6 +402,18 @@ defmodule Poolex do
{:reply, debug_info, state}
end

@impl GenServer
def handle_call({:add_idle_workers, workers_count}, _from, %State{} = state) do
new_state =
workers_count
|> start_workers(state)
|> Enum.reduce(state, fn worker, acc_state ->
IdleWorkers.add(acc_state, worker)
end)

{:reply, :ok, new_state}
end

@impl GenServer
def handle_cast({:release_busy_worker, worker}, %State{} = state) do
if WaitingCallers.empty?(state) do
Expand Down
34 changes: 31 additions & 3 deletions test/poolex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule PoolexTest do

doctest Poolex

alias Poolex.Private.DebugInfo

describe "debug info" do
test "valid after initialization" do
initial_fun = fn -> 0 end
Expand All @@ -12,7 +14,7 @@ defmodule PoolexTest do

debug_info = Poolex.get_debug_info(pool_name)

assert debug_info.__struct__ == Poolex.Private.DebugInfo
assert debug_info.__struct__ == DebugInfo
assert debug_info.busy_workers_count == 0
assert debug_info.busy_workers_impl == Poolex.Workers.Impl.List
assert debug_info.busy_workers_pids == []
Expand Down Expand Up @@ -51,7 +53,7 @@ defmodule PoolexTest do

debug_info = Poolex.get_debug_info(pool_name)

assert debug_info.__struct__ == Poolex.Private.DebugInfo
assert debug_info.__struct__ == DebugInfo
assert debug_info.busy_workers_count == 0
assert Enum.empty?(debug_info.busy_workers_pids)
assert debug_info.idle_workers_count == 5
Expand Down Expand Up @@ -80,7 +82,7 @@ defmodule PoolexTest do

debug_info = Poolex.get_debug_info(pool_name)

assert debug_info.__struct__ == Poolex.Private.DebugInfo
assert debug_info.__struct__ == DebugInfo
assert debug_info.busy_workers_count == 1
assert Enum.count(debug_info.busy_workers_pids) == 1
assert debug_info.idle_workers_count == 4
Expand Down Expand Up @@ -582,4 +584,30 @@ defmodule PoolexTest do
assert elem(message_3, 3) == pool_pid
end
end

describe "add_idle_workers!/2" do
test "adds idle workers to pool" do
initial_fun = fn -> 0 end

pool_name = start_pool(worker_module: Agent, worker_args: [initial_fun], workers_count: 5)

assert %DebugInfo{idle_workers_count: 5} = Poolex.get_debug_info(pool_name)
assert :ok = Poolex.add_idle_workers!(pool_name, 5)
assert %DebugInfo{idle_workers_count: 10} = Poolex.get_debug_info(pool_name)
end

test "raises error on non positive workers_count" do
initial_fun = fn -> 0 end

pool_name = start_pool(worker_module: Agent, worker_args: [initial_fun], workers_count: 5)

assert_raise(ArgumentError, fn ->
Poolex.add_idle_workers!(pool_name, -1)
end)

assert_raise(ArgumentError, fn ->
Poolex.add_idle_workers!(pool_name, 0)
end)
end
end
end