Skip to content

Commit

Permalink
Merge branch 'yordis-fix-260'
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Jan 9, 2024
2 parents a3adc76 + b0a5e02 commit 07bb96f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.16.0-otp-26
erlang 26.2.1
erlang 26.2.1
56 changes: 28 additions & 28 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
defmodule EventStore.Storage.Appender do
@moduledoc false

require Logger

alias EventStore.{RecordedEvent, UUID}
alias EventStore.Sql.Statements

require Logger

@doc """
Append the given list of events to storage.
Expand All @@ -15,7 +14,7 @@ defmodule EventStore.Storage.Appender do
Returns `:ok` on success, `{:error, reason}` on failure.
"""
def append(conn, stream_id, events, opts) do
stream_uuid = stream_uuid(events)
[%RecordedEvent{stream_uuid: stream_uuid} | _] = events

try do
events
Expand Down Expand Up @@ -108,11 +107,13 @@ defmodule EventStore.Storage.Appender do
end

stream_id_or_uuid = stream_id || stream_uuid
parameters = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events)
params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events)

conn
|> Postgrex.query(statement, parameters, opts)
|> handle_response()
case Postgrex.query(conn, statement, params, opts) do
{:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found}
{:ok, %Postgrex.Result{}} -> :ok
{:error, error} -> handle_error(error)
end
end

defp build_insert_parameters(events) do
Expand Down Expand Up @@ -147,41 +148,40 @@ defmodule EventStore.Storage.Appender do
defp insert_link_events(conn, params, event_count, schema, opts) do
statement = Statements.insert_link_events(schema, event_count)

conn
|> Postgrex.query(statement, params, opts)
|> handle_response()
case Postgrex.query(conn, statement, params, opts) do
{:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found}
{:ok, %Postgrex.Result{}} -> :ok
{:error, error} -> handle_error(error)
end
end

defp handle_response({:ok, %Postgrex.Result{num_rows: 0}}), do: {:error, :not_found}
defp handle_response({:ok, %Postgrex.Result{}}), do: :ok

defp handle_response({:error, %Postgrex.Error{} = error}) do
%Postgrex.Error{postgres: %{code: error_code, constraint: constraint}} = error
defp handle_error(%Postgrex.Error{} = error) do
%Postgrex.Error{postgres: postgres} = error

case {error_code, constraint} do
{:foreign_key_violation, _constraint} ->
case postgres do
%{code: :foreign_key_violation} ->
{:error, :not_found}

{:unique_violation, "events_pkey"} ->
%{code: :unique_violation, constraint: "events_pkey"} ->
{:error, :duplicate_event}

{:unique_violation, "stream_events_pkey"} ->
%{code: :unique_violation, constraint: "stream_events_pkey"} ->
{:error, :duplicate_event}

{:unique_violation, "ix_streams_stream_uuid"} ->
# EventStore.Streams.Stream will retry when it gets this
# error code. That will always work because on the second
# time around, the stream will have been made, so the
# race to create the stream will have been resolved.
%{code: :unique_violation, constraint: "ix_streams_stream_uuid"} ->
# EventStore.Streams.Stream will retry when it gets this error code. That will always work
# because on the second time around, the stream will have been made, so the race to create
# the stream will have been resolved.
{:error, :duplicate_stream_uuid}

{:unique_violation, _constraint} ->
%{code: :unique_violation} ->
{:error, :wrong_expected_version}

{error_code, _constraint} ->
%{code: error_code} ->
{:error, error_code}
end
end

defp stream_uuid([event | _]), do: event.stream_uuid
# Return all other errors to the caller
defp handle_error(error), do: {:error, error}
end
16 changes: 16 additions & 0 deletions test/storage/append_events_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ defmodule EventStore.Storage.AppendEventsTest do
end
end

test "append event to schema which does not exist", %{conn: conn} do
recorded_events = EventFactory.create_recorded_events(1, UUID.uuid4())

assert {:error, :undefined_table} =
Appender.append(conn, 1, recorded_events, schema: "doesnotexist")
end

test "append single event with a db connection error", %{conn: conn, schema: schema} do
recorded_events = EventFactory.create_recorded_events(1, UUID.uuid4())

# Using Postgrex query timeout value of zero will cause a `DBConnection.ConnectionError` error
# to be returned.
assert {:error, %DBConnection.ConnectionError{}} =
Appender.append(conn, 1, recorded_events, schema: schema, timeout: 0)
end

defp create_stream(context) do
%{conn: conn, schema: schema} = context

Expand Down

0 comments on commit 07bb96f

Please sign in to comment.