Skip to content

Commit

Permalink
fix: proper handling of large FCM push batches (#86)
Browse files Browse the repository at this point in the history
* fix: proper handling of large FCM push batches

* fix: Enum.chunk support (pre-Elixir v1.5)
  • Loading branch information
hpopp authored Aug 3, 2017
1 parent 88f4209 commit e3a3aca
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 62 deletions.
12 changes: 11 additions & 1 deletion lib/pigeon/apns/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Pigeon.APNS.Config do
config = Application.get_env(:pigeon, :apns)[name]
%{
name: name,
mode: config[:mode],
mode: mode(config[:mode]),
reconnect: Map.get(config, :reconnect, true),
cert: cert(config[:cert]),
certfile: file_path(config[:cert]),
Expand All @@ -20,6 +20,16 @@ defmodule Pigeon.APNS.Config do
}
end

def mode({:system, env_var}), do: to_mode(System.get_env(env_var))
def mode(mode), do: mode

def to_mode(nil), do: raise "APNS.Config mode is nil"
def to_mode("dev"), do: :dev
def to_mode(":dev"), do: :dev
def to_mode("prod"), do: :prod
def to_mode(":prod"), do: :prod
def to_mode(other), do: raise "APNS.Config mode is #{inspect(other)}"

def file_path(nil), do: nil
def file_path(path) when is_binary(path) do
cond do
Expand Down
72 changes: 39 additions & 33 deletions lib/pigeon/fcm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,51 @@ defmodule Pigeon.FCM do
require Logger
import Supervisor.Spec

alias Pigeon.FCM.NotificationResponse
alias Pigeon.FCM.{Notification, NotificationResponse}

@default_timeout 5_000
@default_worker :fcm_default
@chunk_size 1_000

@spec push(Notification.t, Keyword.t) :: {:ok, NotificationResponse.t}
@spec push([Notification.t, ...], Keyword.t) :: [NotificationResponse.t, ...]
def push(notification, opts \\ [])
def push(notification, opts) when is_list(notification) do
case opts[:on_response] do
nil ->
ref = :erlang.make_ref
pid = self()
for n <- notification do
on_response = fn(x) -> send pid, {ref, x} end
send_push(n, on_response, opts)
end
Enum.foldl(notification, %{}, fn(_n, acc) ->
receive do
{^ref, %NotificationResponse{message_id: id} = response} ->
if Map.has_key?(acc, id) do
%{acc | id => merge(response, acc[:message_id])}
else
Map.merge(%{id => response}, acc)
end
after 5_000 ->
acc
end
end)
on_response -> send_push(notification, on_response, opts)
tasks = for n <- notification, do: Task.async(fn -> do_sync_push(n, opts) end)
tasks
|> Task.yield_many(@default_timeout + 10_000)
|> Enum.map(&task_mapper(&1))
on_response ->
for n <- notification, do: send_push(n, on_response, opts)
end
end

def push(notification, opts) do
case opts[:on_response] do
nil -> do_sync_push(notification, opts)
on_response -> send_push(notification, on_response, opts)
end
end

defp task_mapper({task, result}) do
case result do
nil -> Task.shutdown(task, :brutal_kill)
{:ok, {:ok, response}} -> {:ok, response}
{:ok, {:error, :timeout, notification}} -> {:error, notification}
end
end

@doc """
Sends a push over FCM.
"""
def send_push(notification, on_response, opts) do
worker_name = opts[:to] || @default_worker
notification
|> encode_requests()
|> Enum.map(& GenServer.cast(worker_name, generate_envelope(&1, on_response, opts)))
end

defp do_sync_push(notification, opts) do
ref = :erlang.make_ref
pid = self()
Expand Down Expand Up @@ -73,24 +80,22 @@ defmodule Pigeon.FCM do
end
def encode_requests(notification) do
notification.registration_id
|> Enum.chunk(1000, 1000, [])
|> chunk(@chunk_size, @chunk_size, [])
|> Enum.map(& encode_requests(%{notification | registration_id: &1}))
|> List.flatten
end

defp chunk(collection, chunk_size, step, padding) do
if Kernel.function_exported?(Enum, :chunk_every, 4) do
Enum.chunk_every(collection, chunk_size, step, padding)
else
Enum.chunk(collection, chunk_size, step, padding)
end
end

defp recipient_attr([regid]), do: %{"to" => regid}
defp recipient_attr(regid) when is_list(regid), do: %{"registration_ids" => regid}

@doc """
Sends a push over FCM.
"""
def send_push(notification, on_response, opts) do
worker_name = opts[:to] || @default_worker
notification
|> encode_requests()
|> Enum.map(& GenServer.cast(worker_name, generate_envelope(&1, on_response, opts)))
end

def start_connection(opts \\ [])
def start_connection(name) when is_atom(name) do
config = Application.get_env(:pigeon, :fcm)[name]
Expand Down Expand Up @@ -121,7 +126,8 @@ defmodule Pigeon.FCM do
is_map(value_1) -> merge(value_1, value_2)
is_nil(value_1) -> value_2
is_nil(value_2) -> value_1
true -> value_1 ++ value_2
is_list(value_1) && is_list(value_2) -> value_1 ++ value_2
true -> [value_1] ++ [value_2]
end
end)
end
Expand Down
63 changes: 40 additions & 23 deletions lib/pigeon/fcm/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -184,32 +184,46 @@ defmodule Pigeon.FCM.Worker do
end
end

def process_end_stream(%Pigeon.Http2.Stream{id: stream_id, headers: headers, body: body},
def process_end_stream(%Pigeon.Http2.Stream{id: stream_id, headers: headers, body: body, error: error} = stream,
%{socket: _socket, queue: queue} = state) do

{registration_ids, on_response} = queue["#{stream_id}"]
case get_status(headers) do
"200" ->
result = Poison.decode!(body)
parse_result(registration_ids, result, on_response)
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
nil ->
cond do
queue["#{stream_id}"] == nil ->
Logger.error("Unknown stream_id: #{stream_id}, Error: #{inspect(error)}")
{:noreply, state}
"401" ->
log_error("401", "Unauthorized")
unless on_response == nil do on_response.({:error, :unauthorized}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
"400" ->
log_error("400", "Malformed JSON")
unless on_response == nil do on_response.({:error, :malformed_json}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
code ->
reason = parse_error(body)
log_error(code, reason)
unless on_response == nil do on_response.({:error, reason}) end
error == nil ->
{registration_ids, on_response} = queue["#{stream_id}"]
case get_status(headers) do
nil ->
stream |> inspect() |> Logger.error
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
"200" ->
result = Poison.decode!(body)
parse_result(registration_ids, result, on_response)
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
"401" ->
log_error("401", "Unauthorized")
unless on_response == nil do on_response.({:error, :unauthorized}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
"400" ->
log_error("400", "Malformed JSON")
unless on_response == nil do on_response.({:error, :malformed_json}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
code ->
reason = parse_error(body)
log_error(code, reason)
unless on_response == nil do on_response.({:error, reason}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
end
true ->
{_registration_ids, on_response} = queue["#{stream_id}"]
error |> inspect() |> Logger.error
unless on_response == nil do on_response.({:error, :unavailable}) end
new_queue = Map.delete(queue, "#{stream_id}")
{:noreply, %{state | queue: new_queue}}
end
Expand All @@ -222,6 +236,9 @@ defmodule Pigeon.FCM.Worker do
ResultParser.parse(ids, results, on_response, %NotificationResponse{})
end

defp get_status(nil) do
nil
end
defp get_status(headers) do
case Enum.find(headers, fn({key, _val}) -> key == ":status" end) do
{":status", status} -> status
Expand Down
5 changes: 3 additions & 2 deletions lib/pigeon/http2_client/kadabra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ defmodule Pigeon.Http2.Client.Kadabra do

def handle_end_stream({:end_stream, %{id: id,
headers: headers,
body: body}}, _state) do
body: body,
error: error}}, _state) do

{:ok, %Pigeon.Http2.Stream{id: id, headers: headers, body: body}}
{:ok, %Pigeon.Http2.Stream{id: id, headers: headers, body: body, error: error}}
end
def handle_end_stream(msg, _state), do: msg
end
2 changes: 1 addition & 1 deletion lib/pigeon/http2_stream.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
defmodule Pigeon.Http2.Stream do
defstruct id: nil, headers: nil, body: nil
defstruct id: nil, headers: nil, body: nil, error: nil
end
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Pigeon.Mixfile do
def project do
[app: :pigeon,
name: "Pigeon",
version: "1.0.2",
version: "1.0.3",
elixir: "~> 1.2",
source_url: "https://github.com/codedge-llc/pigeon",
description: description(),
Expand All @@ -31,7 +31,7 @@ defmodule Pigeon.Mixfile do
defp deps do
[{:poison, "~> 2.0 or ~> 3.0"},
{:httpoison, "~> 0.7"},
{:kadabra, "~> 0.2.1", optional: true},
{:kadabra, "~> 0.2.2", optional: true},
{:dogma, "~> 0.1", only: :dev},
{:earmark, "~> 1.0", only: :dev},
{:ex_doc, "~> 0.2", only: :dev},
Expand Down

0 comments on commit e3a3aca

Please sign in to comment.