Skip to content

Commit

Permalink
Add init_producer/4
Browse files Browse the repository at this point in the history
  • Loading branch information
maartenvanvliet committed Mar 18, 2024
1 parent 142da70 commit 117b8df
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions lib/gen_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1867,10 +1867,10 @@ defmodule GenStage do
def init({mod, args}) do
case mod.init(args) do
{:producer, state} ->
init_producer(mod, [], state, nil)
init_producer(mod, [], state)

{:producer, state, opts} when is_list(opts) ->
init_producer(mod, opts, state, nil)
init_producer(mod, opts, state)

{:producer, state, opts, additional_info} when is_list(opts) ->
init_producer(mod, opts, state, additional_info)
Expand Down Expand Up @@ -1904,7 +1904,7 @@ defmodule GenStage do
end
end

defp init_producer(mod, opts, state, additional_info) do
defp init_producer(mod, opts, state) do
with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts),
{:ok, buffer_size, opts} <-
Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true),
Expand All @@ -1923,13 +1923,18 @@ defmodule GenStage do
dispatcher_mod: dispatcher_mod,
dispatcher_state: dispatcher_state
}

if additional_info, do: {:ok, stage, additional_info}, else: {:ok, stage}
{:ok, stage}
else
{:error, message} -> {:stop, {:bad_opts, message}}
end
end

defp init_producer(mod, opts, state, additional_info) do
with {:ok, stage} <- init_producer(mod, opts, state) do
{:ok, stage, additional_info}
end
end

defp init_dispatcher(opts) do
case Keyword.pop(opts, :dispatcher, GenStage.DemandDispatcher) do
{dispatcher, opts} when is_atom(dispatcher) ->
Expand Down

0 comments on commit 117b8df

Please sign in to comment.