From 34b2661e8bad60825c9191def940a659ce2337a8 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Lazo Date: Fri, 4 Nov 2022 16:41:13 -0400 Subject: [PATCH 01/32] fix: handle db connection error --- lib/event_store/storage/appender.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index ec6ff83f..93e4f33e 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -183,5 +183,7 @@ defmodule EventStore.Storage.Appender do end end + defp handle_response({:error, %DBConnection.ConnectionError{}} = reply), do: reply + defp stream_uuid([event | _]), do: event.stream_uuid end From fe8fc41587cc20f844b5a0561837767f6aa00c02 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Lazo Date: Fri, 4 Nov 2022 16:44:11 -0400 Subject: [PATCH 02/32] fix: tool-versions --- .tool-versions | 2 +- test/storage/append_events_test.exs | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.tool-versions b/.tool-versions index 20322e2c..a056d77c 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ elixir 1.14.1-otp-24 -erlang 24.3.4.6 +erlang 24.3.4.2 diff --git a/test/storage/append_events_test.exs b/test/storage/append_events_test.exs index 19f83331..0ae2198f 100644 --- a/test/storage/append_events_test.exs +++ b/test/storage/append_events_test.exs @@ -4,6 +4,13 @@ defmodule EventStore.Storage.AppendEventsTest do alias EventStore.{EventFactory, RecordedEvent, UUID} alias EventStore.Storage.{Appender, CreateStream} + test "append single event with a db connection error" do + conn = start_supervised!({Postgrex, TestEventStore.config(queue_timeout: -1, queue_target: -1)}) |> dbg() + recorded_events = EventFactory.create_recorded_events(1, UUID.uuid4()) + assert {:error, %DBConnection.ConnectionError{}} = + Appender.append(conn, 1, recorded_events, schema: "public") + end + test "append single event to new stream", %{conn: conn, schema: schema} = context do {:ok, stream_uuid, stream_id} = create_stream(context) recorded_events = EventFactory.create_recorded_events(1, stream_uuid) From 84ebd1e8917e3d0f7f9a684ddcc3e4acee25f568 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 10 Mar 2023 14:30:07 +0000 Subject: [PATCH 03/32] Remove GitHub funding --- .github/FUNDING.yml | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 .github/FUNDING.yml diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml deleted file mode 100644 index e12fce9f..00000000 --- a/.github/FUNDING.yml +++ /dev/null @@ -1,2 +0,0 @@ -github: slashdotdash -open_collective: commanded From 13465269fefdc0ade75a3a13bb760e10842c5acc Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 10 Mar 2023 14:39:41 +0000 Subject: [PATCH 04/32] Update help section of README --- README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index 9841439d..57eae6d0 100644 --- a/README.md +++ b/README.md @@ -283,6 +283,4 @@ EventStore exists thanks to the following people who have contributed. ## Need help? -Please [open an issue](https://github.com/commanded/eventstore/issues) if you encounter a problem, or need assistance. - -For commercial support, and consultancy, please contact [Ben Smith](mailto:ben@10consulting.com). +Please [open an issue](https://github.com/commanded/eventstore/issues) if you encounter a problem, or need assistance. You can also seek help in the #commanded channel in the [official Elixir Slack](https://elixir-slackin.herokuapp.com/). From cbe4f52cf9428a26b936f6c21c3b794ee41fee00 Mon Sep 17 00:00:00 2001 From: Danilo Silva Date: Fri, 18 Nov 2022 15:52:01 +0100 Subject: [PATCH 05/32] fix: allow disconnect_on_error_codes to be pass to the event store connection --- lib/event_store/config.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/event_store/config.ex b/lib/event_store/config.ex index d83b7e49..1314b1eb 100644 --- a/lib/event_store/config.ex +++ b/lib/event_store/config.ex @@ -85,7 +85,8 @@ defmodule EventStore.Config do :queue_target, :queue_interval, :socket_options, - :parameters + :parameters, + :disconnect_on_error_codes ] def default_postgrex_opts(config) do From 94cff129235b05eb125f7ce340f68b12bcae6950 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 11:49:20 +0100 Subject: [PATCH 06/32] Include #263 in CHANGELOG --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9c80218..71554a51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Next release + +### Bug fixes + +* Allow `disconnect_on_error_codes` to be pass to the event store Postgrex connection ([#263](https://github.com/commanded/eventstore/pull/263)). + ## v1.4.1 ### Enhancements From 6b72a641379619cb75e69c6b4d96f892f87d8476 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 12:17:02 +0100 Subject: [PATCH 07/32] Support all Postgrex and DBConnection options --- lib/event_store/config.ex | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/lib/event_store/config.ex b/lib/event_store/config.ex index 1314b1eb..418323fa 100644 --- a/lib/event_store/config.ex +++ b/lib/event_store/config.ex @@ -68,25 +68,42 @@ defmodule EventStore.Config do end @postgrex_connection_opts [ - :username, - :password, + :after_connect, + :after_connect_timeout, + :backoff_max, + :backoff_min, + :backoff_type, + :configure, + :connect_timeout, + :connection_listeners, :database, + :disconnect_on_error_codes, + :endpoints, + :handshake_timeout, :hostname, - :configure, + :idle_interval, + :max_restarts, + :max_seconds, + :parameters, + :password, + :ping_timeout, + :pool, + :pool_size, :port, - :types, + :prepare, + :queue_interval, + :queue_target, + :search_path, + :show_sensitive_data_on_connection_error, :socket, :socket_dir, + :socket_options, :ssl, :ssl_opts, :timeout, - :pool, - :pool_size, - :queue_target, - :queue_interval, - :socket_options, - :parameters, - :disconnect_on_error_codes + :transactions, + :types, + :username ] def default_postgrex_opts(config) do From 6bb877810464924616d93d18c4e48980fa6f91d9 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 12:43:51 +0100 Subject: [PATCH 08/32] Include #273 in CHANGELOG --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71554a51..a557b201 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,10 @@ ## Next release -### Bug fixes +### Enhancements -* Allow `disconnect_on_error_codes` to be pass to the event store Postgrex connection ([#263](https://github.com/commanded/eventstore/pull/263)). +* Allow `disconnect_on_error_codes` to be passed to the event store Postgrex connection ([#263](https://github.com/commanded/eventstore/pull/263)). +* Support all `Postgrex` and `DBConnection` options when configuring an event store ([#273](https://github.com/commanded/eventstore/pull/273)). ## v1.4.1 From e4904ea400b44f2d6996c31087aae6e12284dbbc Mon Sep 17 00:00:00 2001 From: Ryan Young Date: Mon, 15 May 2023 13:39:55 -0400 Subject: [PATCH 09/32] Fix subscribe/2 dialyzer error Type mismatch when `name` is passed into the options keyword list. --- lib/event_store.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/event_store.ex b/lib/event_store.ex index fe6f68f4..2eab916c 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -439,7 +439,7 @@ defmodule EventStore do def subscribe(stream_uuid, opts \\ []) do name = name(opts) - PubSub.subscribe(name, stream_uuid, opts) + PubSub.subscribe(name, stream_uuid, Keyword.delete(opts, :name)) end def subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts \\ []) do @@ -537,7 +537,7 @@ defmodule EventStore do end defp name(opts) do - case Keyword.get(opts, :name) do + case Keyword.pop(opts, :name) do nil -> __MODULE__ From a9d2a5893bb64a6827132657c316b0dd6a872b71 Mon Sep 17 00:00:00 2001 From: Ryan Young Date: Mon, 15 May 2023 13:56:10 -0400 Subject: [PATCH 10/32] use keyword.get instead of pop --- lib/event_store.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/event_store.ex b/lib/event_store.ex index 2eab916c..c571254a 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -537,7 +537,7 @@ defmodule EventStore do end defp name(opts) do - case Keyword.pop(opts, :name) do + case Keyword.get(opts, :name) do nil -> __MODULE__ From 50d62195e6644a554b60c44671a856d04173ecf8 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Tue, 13 Jun 2023 15:16:25 +0100 Subject: [PATCH 11/32] Terminate a monitored process only if there are no other processes registered to use it --- .github/workflows/test.yml | 4 +- .tool-versions | 4 +- lib/event_store/application.ex | 3 +- lib/event_store/monitored_server.ex | 86 ++++++++++++++------- test/monitored_server_test.exs | 110 +++++++++++++++++++++++++-- test/shared_connection_pool_test.exs | 55 ++++++++++---- test/support/observed_server.ex | 4 + 7 files changed, 210 insertions(+), 56 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 80a2e040..59fe6f55 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,8 +8,8 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - otp: ['24.3', '25.1'] - elixir: ['1.14.1'] + otp: ['24.3', '25.3'] + elixir: ['1.14.5'] services: postgres: diff --git a/.tool-versions b/.tool-versions index a056d77c..1a5e6c89 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.14.1-otp-24 -erlang 24.3.4.2 +elixir 1.16.0-otp-26 +erlang 26.2.1 \ No newline at end of file diff --git a/lib/event_store/application.ex b/lib/event_store/application.ex index ec3e352a..b1de94ab 100644 --- a/lib/event_store/application.ex +++ b/lib/event_store/application.ex @@ -5,7 +5,8 @@ defmodule EventStore.Application do def start(_type, _args) do children = [ - EventStore.Config.Store + EventStore.Config.Store, + {Registry, keys: :duplicate, name: MonitoredServer.Registry} ] opts = [strategy: :one_for_one, name: EventStore.Supervisor] diff --git a/lib/event_store/monitored_server.ex b/lib/event_store/monitored_server.ex index 013ed65e..c8ab6ce6 100644 --- a/lib/event_store/monitored_server.ex +++ b/lib/event_store/monitored_server.ex @@ -23,7 +23,6 @@ defmodule EventStore.MonitoredServer do :name, :backoff, :pid, - :terminate?, :shutdown, :queue, monitors: Map.new() @@ -57,14 +56,14 @@ defmodule EventStore.MonitoredServer do {delay, state} end - def on_process_start(%State{} = state, pid) do + def process_started(%State{} = state, pid) do %State{backoff: backoff} = state %State{state | backoff: Backoff.reset(backoff), pid: pid, queue: :queue.new()} end - def on_process_exit(%State{} = state) do - %State{state | pid: nil, terminate?: nil} + def process_exited(%State{} = state) do + %State{state | pid: nil} end end @@ -81,6 +80,10 @@ defmodule EventStore.MonitoredServer do GenServer.call(name, {__MODULE__, :monitor, self()}) end + def monitors(name) do + GenServer.call(name, {__MODULE__, :monitors}) + end + def init(%State{} = state) do Process.flag(:trap_exit, true) @@ -98,6 +101,12 @@ defmodule EventStore.MonitoredServer do {:reply, {:ok, ref}, state} end + def handle_call({__MODULE__, :monitors}, _from, %State{} = state) do + %State{monitors: monitors} = state + + {:reply, {:ok, monitors}, state} + end + def handle_call(msg, from, %State{pid: nil} = state) do {:noreply, enqueue({:call, msg, from}, state)} end @@ -127,12 +136,7 @@ defmodule EventStore.MonitoredServer do {:noreply, on_process_exit(pid, reason, state)} end - # Monitor process has exited so remove from monitors map. - def handle_info({:EXIT, pid, _reason}, %State{} = state) do - %State{monitors: monitors} = state - - state = %State{state | monitors: Map.delete(monitors, pid)} - + def handle_info({:EXIT, _pid, _reason}, %State{} = state) do {:noreply, state} end @@ -141,6 +145,15 @@ defmodule EventStore.MonitoredServer do {:noreply, on_process_exit(pid, reason, state)} end + def handle_info({:DOWN, _ref, :process, pid, _reason}, %State{} = state) do + %State{monitors: monitors} = state + + # Remove monitoring process from monitors + state = %State{state | monitors: Map.delete(monitors, pid)} + + {:noreply, state} + end + def handle_info(msg, %State{pid: nil} = state) do {:noreply, enqueue({:info, msg}, state)} end @@ -152,31 +165,42 @@ defmodule EventStore.MonitoredServer do end def terminate(_reason, %State{pid: nil}), do: :ok - def terminate(_reason, %State{terminate?: false}), do: :ok def terminate(reason, %State{} = state) do %State{pid: pid, shutdown: shutdown, mfa: {module, _fun, _args}} = state - Logger.debug("Monitored server #{inspect(module)} terminate due to: #{inspect(reason)}") + Logger.debug("Monitored server #{inspect(module)} terminate due to: " <> inspect(reason)) + + :ok = Registry.unregister(MonitoredServer.Registry, pid) - Process.exit(pid, reason) + if terminate?(pid) do + Process.link(pid) + Process.exit(pid, reason) - receive do - {:EXIT, ^pid, _} -> :ok - after - shutdown -> - Logger.warn( - "Monitored server #{inspect(module)} failed to terminate within #{shutdown}, killing it brutally" - ) + receive do + {:EXIT, ^pid, _reason} -> :ok + after + shutdown -> + Logger.warn( + "Monitored server #{inspect(module)} failed to terminate within #{shutdown}, killing it brutally" + ) - Process.exit(pid, :kill) + Process.exit(pid, :kill) - receive do - {:EXIT, ^pid, _} -> :ok - end + receive do + {:EXIT, ^pid, _reason} -> :ok + end + end + else + :ok end end + # Terminate the process only if there are no other processes registered to use it. + defp terminate?(pid) do + Registry.lookup(MonitoredServer.Registry, pid) == [] + end + # Attempt to start the process, retry after a delay on failure. defp start_process(%State{} = state) do %State{mfa: {module, fun, args}} = state @@ -187,7 +211,11 @@ defmodule EventStore.MonitoredServer do {:ok, pid} -> Logger.debug("Successfully started #{inspect(module)} (#{inspect(pid)})") - on_process_start(pid, %State{state | terminate?: true}) + # Unlink the process and instead use a monitor to enable it to be restarted on exit + Process.unlink(pid) + Process.monitor(pid) + + on_process_start(pid, state) {:error, {:already_started, pid}} -> Logger.debug("Monitored process already started #{inspect(module)} (#{inspect(pid)})") @@ -195,7 +223,7 @@ defmodule EventStore.MonitoredServer do # Monitor already started process to enable it to be restarted on exit Process.monitor(pid) - on_process_start(pid, %State{state | terminate?: false}) + on_process_start(pid, state) {:error, reason} -> Logger.info("Failed to start #{inspect(module)} due to: #{inspect(reason)}") @@ -207,9 +235,11 @@ defmodule EventStore.MonitoredServer do defp on_process_start(pid, %State{} = state) do %State{queue: queue} = state + {:ok, _} = Registry.register(MonitoredServer.Registry, pid, []) + :ok = forward_queued_msgs(pid, queue) - State.on_process_start(state, pid) + State.process_started(state, pid) end defp on_process_exit(pid, reason, %State{} = state) do @@ -221,7 +251,7 @@ defmodule EventStore.MonitoredServer do Process.send(monitor, {:DOWN, ref, :process, pid, reason}, []) end) - state = State.on_process_exit(state) + state = State.process_exited(state) # Attempt to restart the process delayed_start(state) diff --git a/test/monitored_server_test.exs b/test/monitored_server_test.exs index 9daafb39..bf155319 100644 --- a/test/monitored_server_test.exs +++ b/test/monitored_server_test.exs @@ -1,16 +1,19 @@ defmodule EventStore.MonitoredServerTest do use ExUnit.Case - alias EventStore.{MonitoredServer, ObservedServer, ProcessHelper} + alias EventStore.{MonitoredServer, ObservedServer, ProcessHelper, Wait} describe "monitored server" do test "should start process" do - start_monitored_process!() + {monitor, ref} = start_monitored_process!() assert_receive {:init, pid} assert Process.whereis(ObservedServer) == pid assert Process.alive?(pid) + + assert {:ok, monitors} = MonitoredServer.monitors(monitor) + assert monitors == %{self() => ref} end test "should stop observed process when monitored process stopped" do @@ -26,7 +29,7 @@ defmodule EventStore.MonitoredServerTest do end test "should restart process after exit" do - start_monitored_process!() + {monitor, ref} = start_monitored_process!() assert_receive {:init, pid1} @@ -38,6 +41,9 @@ defmodule EventStore.MonitoredServerTest do assert pid1 != pid2 refute Process.alive?(pid1) assert Process.alive?(pid2) + + assert {:ok, monitors} = MonitoredServer.monitors(monitor) + assert monitors == %{self() => ref} end test "should retry start on failure" do @@ -59,6 +65,28 @@ defmodule EventStore.MonitoredServerTest do assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown} end + test "should send `:DOWN` message after process crash" do + {monitor, ref} = start_monitored_process!() + + assert_receive {:init, pid1} + refute_receive {:DOWN, _ref, :process, _pid, _reason} + + assert :ok = GenServer.cast(MonitoredServer, :crash) + + assert_receive {:DOWN, ^ref, :process, ^pid1, {%RuntimeError{message: "crash"}, _}} + + # Should restart the crashed process + assert_receive {:init, pid2} + + assert is_pid(pid2) + assert pid1 != pid2 + refute Process.alive?(pid1) + assert Process.alive?(pid2) + + assert {:ok, monitors} = MonitoredServer.monitors(monitor) + assert monitors == %{self() => ref} + end + test "should forward calls to observed process using registered name" do start_monitored_process!() @@ -94,12 +122,15 @@ defmodule EventStore.MonitoredServerTest do assert {:ok, :pong} = GenServer.call(pid, :ping) - {monitor, _ref} = start_monitored_process!() + {monitor, ref} = start_monitored_process!() assert {:ok, :pong} = GenServer.call(monitor, :ping) + + assert {:ok, monitors} = MonitoredServer.monitors(monitor) + assert monitors == %{self() => ref} end - test "stopping monitored observer associated with an already started process should not terminate process" do + test "stopping monitored observer associated with an already started process should terminate process" do pid = start_supervised!({ObservedServer, reply_to: self(), name: ObservedServer}) start_monitored_process!() @@ -110,13 +141,13 @@ defmodule EventStore.MonitoredServerTest do :ok = stop_supervised(MonitoredServer) - refute_receive {:DOWN, ^ref, :process, ^pid, :shutdown} + assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown} end test "monitored observer should attempt to restart an already started process on exit" do pid1 = start_supervised!({ObservedServer, reply_to: self(), name: ObservedServer}) - {_pid, ref} = start_monitored_process!() + {_monitor, ref} = start_monitored_process!() assert_receive {:init, ^pid1} @@ -127,6 +158,71 @@ defmodule EventStore.MonitoredServerTest do refute pid1 == pid2 end + + test "stopping all monitored servers should terminate monitored process" do + start_supervised!( + {MonitoredServer, + mfa: {ObservedServer, :start_link, [[reply_to: self(), name: ObservedServer]]}, + name: MonitoredServer1, + backoff_min: 1, + backoff_max: 100}, + id: MonitoredServer1 + ) + + start_supervised!( + {MonitoredServer, + mfa: {ObservedServer, :start_link, [[reply_to: self(), name: ObservedServer]]}, + name: MonitoredServer2, + backoff_min: 1, + backoff_max: 100}, + id: MonitoredServer2 + ) + + assert_receive {:init, pid} + refute_received {:init, _pid} + + ref = Process.monitor(pid) + + :ok = stop_supervised(MonitoredServer1) + + # Monitored process should still be alive after only stopping one assocated monitor + refute_receive {:DOWN, ^ref, :process, ^pid, :shutdown} + + :ok = stop_supervised(MonitoredServer2) + + # Monitored process should be terminated when both monitors have been stopped + assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown} + end + + test "should remove monitor when process terminates" do + reply_to = self() + + {monitor, ref1} = start_monitored_process!() + + pid = + spawn(fn -> + {:ok, ref} = MonitoredServer.monitor(MonitoredServer) + + send(reply_to, {:monitored, ref}) + + receive do + :stop -> :ok + end + end) + + assert_receive {:monitored, ref2} + + assert {:ok, monitors} = MonitoredServer.monitors(monitor) + assert monitors == %{self() => ref1, pid => ref2} + + send(pid, :stop) + + # Should remove terminated monitoring process from monitors + Wait.until(fn -> + assert {:ok, monitors} = MonitoredServer.monitors(monitor) + assert monitors == %{self() => ref1} + end) + end end defp start_monitored_process!(opts \\ []) do diff --git a/test/shared_connection_pool_test.exs b/test/shared_connection_pool_test.exs index 0a077844..30cce230 100644 --- a/test/shared_connection_pool_test.exs +++ b/test/shared_connection_pool_test.exs @@ -53,14 +53,17 @@ defmodule EventStore.SharedConnectionPoolTest do end test "start a separate Postgrex connection for non-shared connection pools" do + shared_conn = Process.whereis(Module.concat([:shared_pool, Postgrex])) + # An event store started without specifying a connection pool should start its own pool - pid = Process.whereis(Module.concat([:eventstore3, Postgrex])) + conn = Process.whereis(Module.concat([:eventstore3, Postgrex])) - assert is_pid(pid) - assert_postgrex_connection(pid) + assert is_pid(conn) + assert_postgrex_connection(conn) + assert shared_conn != conn end - test "stopping event store with shared connection pool should start new connection" do + test "stopping event store with shared connection pool should not stop connection pool" do conn = Process.whereis(Module.concat([:shared_pool, Postgrex])) assert is_pid(conn) @@ -68,23 +71,14 @@ defmodule EventStore.SharedConnectionPoolTest do stop_supervised!(:eventstore1) - assert_receive {:DOWN, ^ref, :process, _object, _reason} - - conn = - Wait.until(fn -> - conn = Process.whereis(Module.concat([:shared_pool, Postgrex])) - assert is_pid(conn) - - conn - end) + refute_receive {:DOWN, ^ref, :process, _object, _reason} - # Ensure newly started Postgrex connection can be used + # Ensure existing Postgrex connection can still be used assert {:ok, _events} = append_events_to_stream(:eventstore2, UUID.uuid4(), 1) - ref = Process.monitor(conn) - stop_supervised!(:eventstore2) + # Stopping the last event store using the shared pool should stop the Postgrex connection assert_receive {:DOWN, ^ref, :process, _object, _reason} end @@ -148,6 +142,35 @@ defmodule EventStore.SharedConnectionPoolTest do assert_receive {:events, _events} refute_receive {:events, _events} end + + test "subscription to stream continues to receive events after event store with shared pool is stopped" do + stream_uuid = UUID.uuid4() + + {:ok, subscription} = + TestEventStore.subscribe_to_stream(stream_uuid, "subscriber2", self(), name: :eventstore2) + + assert_receive {:subscribed, ^subscription} + + {:ok, _events} = append_events_to_stream(:eventstore2, stream_uuid, 1) + + assert_receive {:events, received_events} + + :ok = TestEventStore.ack(subscription, received_events) + + refute_received {:events, _events} + + # Stop eventstore1 + stop_supervised!(:eventstore1) + + # Append new events to stream should be received via eventstore2 subscription + {:ok, _events} = append_events_to_stream(:eventstore2, stream_uuid, 1, 1) + + assert_receive {:events, received_events} + + :ok = TestEventStore.ack(subscription, received_events) + + refute_received {:events, _events} + end end # Check that this is a `Postgrex` process by executing a database query. diff --git a/test/support/observed_server.ex b/test/support/observed_server.ex index 31745c6c..84436d39 100644 --- a/test/support/observed_server.ex +++ b/test/support/observed_server.ex @@ -26,6 +26,10 @@ defmodule EventStore.ObservedServer do {:reply, {:ok, :pong}, reply_to} end + def handle_cast(:crash, _state) do + raise RuntimeError, message: "crash" + end + def handle_cast(:ping, reply_to) do send(reply_to, :pong) From 8d70f1bb3d02c6c4ab2e8031b3132bd001a6dde3 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 12:58:54 +0100 Subject: [PATCH 12/32] Include #272 in CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a557b201..22ecf9b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ * Allow `disconnect_on_error_codes` to be passed to the event store Postgrex connection ([#263](https://github.com/commanded/eventstore/pull/263)). * Support all `Postgrex` and `DBConnection` options when configuring an event store ([#273](https://github.com/commanded/eventstore/pull/273)). +### Bug fixes + +* Terminate a monitored process only if there are no other processes registered to use it ([#272](https://github.com/commanded/eventstore/pull/272)). + ## v1.4.1 ### Enhancements From c619730fbb847669d02078c7157f4965bf6d00ae Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 13:08:47 +0100 Subject: [PATCH 13/32] Include #286 in CHANGELOG --- CHANGELOG.md | 1 + README.md | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22ecf9b2..44bda6ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ ### Bug fixes +* Dialyzer error when calling `EventStore.subscribe/2` with `:name` option ([#268](https://github.com/commanded/eventstore/pull/268)). * Terminate a monitored process only if there are no other processes registered to use it ([#272](https://github.com/commanded/eventstore/pull/272)). ## v1.4.1 diff --git a/README.md b/README.md index 57eae6d0..42d8c650 100644 --- a/README.md +++ b/README.md @@ -273,6 +273,7 @@ EventStore exists thanks to the following people who have contributed. - [Paul Iannazzo](https://github.com/boxxxie) - [Piotr Szmielew](https://github.com/esse) - [Raphaël Lustin](https://github.com/rlustin) +- [Ryan Young](https://github.com/ryoung786) - [Samuel Roze](https://github.com/sroze) - [Simon Harris](https://github.com/harukizaemon) - [Stuart Corbishley](https://github.com/stuartc) From 6ff7cf7361ebc4f8b54bfa17d52c2d3608cc0ddf Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 13:35:58 +0100 Subject: [PATCH 14/32] Update mix dependencies --- config/test.exs | 1 + mix.exs | 8 +-- mix.lock | 20 +++---- .../notifications_reconnect_test.exs | 7 ++- .../subscription_recovery_test.exs | 56 ++++--------------- 5 files changed, 29 insertions(+), 63 deletions(-) diff --git a/config/test.exs b/config/test.exs index bd56ca0d..8a72974e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -8,6 +8,7 @@ config :ex_unit, refute_receive_timeout: 100 default_config = [ + idle_interval: 100, username: "postgres", password: "postgres", database: "eventstore_test", diff --git a/mix.exs b/mix.exs index 290ff842..a57520fe 100644 --- a/mix.exs +++ b/mix.exs @@ -40,16 +40,16 @@ defmodule EventStore.Mixfile do defp deps do [ {:fsm, "~> 0.3"}, - {:gen_stage, "~> 1.1"}, - {:postgrex, "~> 0.16"}, + {:gen_stage, "~> 1.2"}, + {:postgrex, "~> 0.17"}, # Optional dependencies - {:jason, "~> 1.3", optional: true}, + {:jason, "~> 1.4", optional: true}, {:poolboy, "~> 1.5", optional: true}, # Development and test tooling {:benchfella, "~> 0.3", only: :bench}, - {:dialyxir, "~> 1.2", only: :dev, runtime: false}, + {:dialyxir, "~> 1.3", only: :dev, runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev} ] end diff --git a/mix.lock b/mix.lock index 26f8cd2d..707ce61a 100644 --- a/mix.lock +++ b/mix.lock @@ -1,20 +1,20 @@ %{ "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, - "db_connection": {:hex, :db_connection, "2.4.2", "f92e79aff2375299a16bcb069a14ee8615c3414863a6fef93156aee8e86c2ff3", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4fe53ca91b99f55ea249693a0229356a08f4d1a7931d8ffa79289b145fe83668"}, - "decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"}, - "dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.28", "0bf6546eb7cd6185ae086cbc5d20cd6dbb4b428aad14c02c49f7b554484b4586", [:mix], [], "hexpm", "501cef12286a3231dc80c81352a9453decf9586977f917a96e619293132743fb"}, + "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, + "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, + "dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.32", "fa739a0ecfa34493de19426681b23f6814573faee95dfd4b4aafe15a7b5b32c6", [:mix], [], "hexpm", "b8b0dd77d60373e77a3d7e8afa598f325e49e8663a51bcc2b88ef41838cca755"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.29.0", "4a1cb903ce746aceef9c1f9ae8a6c12b742a5461e6959b9d3b24d813ffbea146", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "f096adb8bbca677d35d278223361c7792d496b3fc0d0224c9d4bc2f651af5db1"}, + "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"}, "fsm": {:hex, :fsm, "0.3.1", "087aa9b02779a84320dc7a2d8464452b5308e29877921b2bde81cdba32a12390", [:mix], [], "hexpm", "fbf0d53f89e9082b326b0b5828b94b4c549ff9d1452bbfd00b4d1ac082208e96"}, - "gen_stage": {:hex, :gen_stage, "1.1.2", "b1656cd4ba431ed02c5656fe10cb5423820847113a07218da68eae5d6a260c23", [:mix], [], "hexpm", "9e39af23140f704e2b07a3e29d8f05fd21c2aaf4088ff43cb82be4b9e3148d02"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, - "postgrex": {:hex, :postgrex, "0.16.5", "fcc4035cc90e23933c5d69a9cd686e329469446ef7abba2cf70f08e2c4b69810", [:mix], [{:connection, "~> 1.1", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "edead639dc6e882618c01d8fc891214c481ab9a3788dfe38dd5e37fd1d5fb2e8"}, - "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, + "postgrex": {:hex, :postgrex, "0.17.1", "01c29fd1205940ee55f7addb8f1dc25618ca63a8817e56fac4f6846fc2cddcbe", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "14b057b488e73be2beee508fb1955d8db90d6485c6466428fe9ccf1d6692a555"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, } diff --git a/test/notifications/notifications_reconnect_test.exs b/test/notifications/notifications_reconnect_test.exs index baca63d4..04541217 100644 --- a/test/notifications/notifications_reconnect_test.exs +++ b/test/notifications/notifications_reconnect_test.exs @@ -59,10 +59,11 @@ defmodule EventStore.Notifications.NotificationsReconnectTest do end defp shutdown_postgrex_notifications_connection(name) do - pid = Process.whereis(name) - assert is_pid(pid) + conn = Process.whereis(name) + assert is_pid(conn) + + {_, %{protocol: %{sock: {:gen_tcp, sock}}}} = :sys.get_state(conn) - {:gen_tcp, sock} = :sys.get_state(pid).mod_state.protocol.sock :gen_tcp.shutdown(sock, :read_write) end diff --git a/test/subscriptions/subscription_recovery_test.exs b/test/subscriptions/subscription_recovery_test.exs index 5d4fd487..5bfdb1ec 100644 --- a/test/subscriptions/subscription_recovery_test.exs +++ b/test/subscriptions/subscription_recovery_test.exs @@ -1,16 +1,13 @@ defmodule EventStore.Subscriptions.SubscriptionRecoveryTest do use EventStore.StorageCase - @moduletag :slow - - alias EventStore.{EventFactory, RecordedEvent, UUID} + alias EventStore.{EventFactory, RecordedEvent, UUID, Wait} alias EventStore.Subscriptions.Subscription alias TestEventStore, as: EventStore describe "subscription recovery" do test "should receive events after socket is closed" do subscription_name = UUID.uuid4() - stream1_uuid = UUID.uuid4() append_to_stream(stream1_uuid, 10) @@ -39,40 +36,24 @@ defmodule EventStore.Subscriptions.SubscriptionRecoveryTest do end defp kill_socket do - port = wait_until(&get_port/0) + port = get_port() + :erlang.monitor(:port, port) :erlang.port_close(port) + assert_receive {:DOWN, _monitor_ref, _type, _object, _info} end - defp wait_socket() do - # This is because MonitoredServer has some problem if we - # do :sys.get_state very often. So we set a high step - # so we leave time for MonitoredServer to start again - wait_until( - 50_000, - 5_000, - fn -> - refute :undefined == :erlang.port_info(get_port()) - end - ) + defp wait_socket do + Wait.until(fn -> + refute :undefined == :erlang.port_info(get_port()) + end) end defp get_port do - pid = - GenServer.whereis(TestEventStore.EventStore.Notifications.Listener.Postgrex) - |> :sys.get_state() - |> Map.get(:pid) + conn = GenServer.whereis(TestEventStore.Postgrex.Notifications) - refute is_nil(pid) - assert Process.alive?(pid) - - {_, port} = - pid - |> :sys.get_state() - |> Map.get(:mod_state) - |> Map.get(:protocol) - |> Map.get(:sock) + {_, %{protocol: %{sock: {:gen_tcp, port}}}} = :sys.get_state(conn) port end @@ -107,21 +88,4 @@ defmodule EventStore.Subscriptions.SubscriptionRecoveryTest do :ok = Subscription.ack(subscription, received_events) end - - def wait_until(timeout \\ 1000, step \\ 50, fun) - def wait_until(timeout, _step, fun) when timeout <= 0, do: fun.() - - def wait_until(timeout, step, fun) do - try do - fun.() - catch - :exit, _ -> - Process.sleep(step) - wait_until(max(0, timeout - step), step, fun) - end - rescue - _ -> - Process.sleep(step) - wait_until(max(0, timeout - step), step, fun) - end end From ccae017d63e0fff5ccb6fee51e3bacd76cfea847 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 14:33:34 +0100 Subject: [PATCH 15/32] Increase test wait duration --- test/subscriptions/subscription_recovery_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/subscriptions/subscription_recovery_test.exs b/test/subscriptions/subscription_recovery_test.exs index 5bfdb1ec..9f4f6eb7 100644 --- a/test/subscriptions/subscription_recovery_test.exs +++ b/test/subscriptions/subscription_recovery_test.exs @@ -45,7 +45,7 @@ defmodule EventStore.Subscriptions.SubscriptionRecoveryTest do end defp wait_socket do - Wait.until(fn -> + Wait.until(5_000, fn -> refute :undefined == :erlang.port_info(get_port()) end) end From 1c040706b3674100f27198e4744f1bf4782ade24 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 14 Jun 2023 15:02:42 +0100 Subject: [PATCH 16/32] Release v1.4.2 --- CHANGELOG.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44bda6ef..6768020d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## Next release +## v1.4.2 ### Enhancements diff --git a/mix.exs b/mix.exs index a57520fe..dd8d7c03 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule EventStore.Mixfile do use Mix.Project @source_url "https://github.com/commanded/eventstore" - @version "1.4.1" + @version "1.4.2" def project do [ From 80c10d61e965346930c023555a876889da312480 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 10 Jul 2023 10:37:33 +0100 Subject: [PATCH 17/32] Add Yordis Prieto to contributors in README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 42d8c650..a28f55b7 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,7 @@ EventStore exists thanks to the following people who have contributed. - [Victor Oliveira Nascimento](https://github.com/victorolinasc) - [Yamil Díaz Aguirre](https://github.com/Yamilquery) - [Yannis Weishaupt](https://github.com/MrYawe) +- [Yordis Prieto](https://github.com/yordis) ## Need help? From 98f034514614cc69fd28913186aa02c1dc4516fb Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Tue, 3 Oct 2023 23:49:21 -0400 Subject: [PATCH 18/32] chore: fix elixir warning deprecation message Signed-off-by: Yordis Prieto --- lib/event_store/monitored_server.ex | 2 +- lib/event_store/storage/appender.ex | 4 ++-- lib/event_store/storage/create_stream.ex | 4 ++-- lib/event_store/storage/delete_stream.ex | 10 +++++----- lib/event_store/storage/reader.ex | 8 ++++---- lib/event_store/storage/snapshot.ex | 6 +++--- lib/event_store/storage/subscription.ex | 6 +++--- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/lib/event_store/monitored_server.ex b/lib/event_store/monitored_server.ex index c8ab6ce6..6aca05cd 100644 --- a/lib/event_store/monitored_server.ex +++ b/lib/event_store/monitored_server.ex @@ -181,7 +181,7 @@ defmodule EventStore.MonitoredServer do {:EXIT, ^pid, _reason} -> :ok after shutdown -> - Logger.warn( + Logger.warning( "Monitored server #{inspect(module)} failed to terminate within #{shutdown}, killing it brutally" ) diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index 93e4f33e..97299d15 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -34,7 +34,7 @@ defmodule EventStore.Storage.Appender do end) catch {:error, error} = reply -> - Logger.warn( + Logger.warning( "Failed to append events to stream #{inspect(stream_uuid)} due to: " <> inspect(error) ) @@ -76,7 +76,7 @@ defmodule EventStore.Storage.Appender do end) catch {:error, error} = reply -> - Logger.warn("Failed to link events to stream due to: #{inspect(error)}") + Logger.warning("Failed to link events to stream due to: #{inspect(error)}") reply end diff --git a/lib/event_store/storage/create_stream.ex b/lib/event_store/storage/create_stream.ex index fd4ab8f3..758620b9 100644 --- a/lib/event_store/storage/create_stream.ex +++ b/lib/event_store/storage/create_stream.ex @@ -24,13 +24,13 @@ defmodule EventStore.Storage.CreateStream do {:error, %Postgrex.Error{postgres: %{code: :unique_violation}}}, stream_uuid ) do - Logger.warn("Failed to create stream #{inspect(stream_uuid)}, already exists") + Logger.warning("Failed to create stream #{inspect(stream_uuid)}, already exists") {:error, :stream_exists} end defp handle_response({:error, error}, stream_uuid) do - Logger.warn("Failed to create stream #{inspect(stream_uuid)} due to: " <> inspect(error)) + Logger.warning("Failed to create stream #{inspect(stream_uuid)} due to: " <> inspect(error)) {:error, error} end diff --git a/lib/event_store/storage/delete_stream.ex b/lib/event_store/storage/delete_stream.ex index 799ae02b..82da7f73 100644 --- a/lib/event_store/storage/delete_stream.ex +++ b/lib/event_store/storage/delete_stream.ex @@ -17,14 +17,14 @@ defmodule EventStore.Storage.DeleteStream do :ok {:ok, %Postgrex.Result{num_rows: 0}} -> - Logger.warn(fn -> + Logger.warning(fn -> "Failed to soft delete stream #{inspect(stream_id)} due to: stream not found" end) {:error, :stream_not_found} {:error, error} = reply -> - Logger.warn(fn -> + Logger.warning(fn -> "Failed to soft delete stream #{inspect(stream_id)} due to: " <> inspect(error) end) @@ -44,21 +44,21 @@ defmodule EventStore.Storage.DeleteStream do :ok {:ok, %Postgrex.Result{num_rows: 0}} -> - Logger.warn(fn -> + Logger.warning(fn -> "Failed to hard delete stream #{inspect(stream_id)} due to: stream not found" end) {:error, :stream_not_found} {:error, %Postgrex.Error{postgres: %{code: :feature_not_supported}} = error} -> - Logger.warn(fn -> + Logger.warning(fn -> "Failed to hard delete stream #{inspect(stream_id)} due to: " <> inspect(error) end) {:error, :not_supported} {:error, error} = reply -> - Logger.warn(fn -> + Logger.warning(fn -> "Failed to hard delete stream #{inspect(stream_id)} due to: " <> inspect(error) end) diff --git a/lib/event_store/storage/reader.ex b/lib/event_store/storage/reader.ex index 27189d7d..d67bdd64 100644 --- a/lib/event_store/storage/reader.ex +++ b/lib/event_store/storage/reader.ex @@ -34,7 +34,7 @@ defmodule EventStore.Storage.Reader do end defp failed_to_read(stream_id, reason) do - Logger.warn(fn -> + Logger.warning(fn -> "Failed to read events from stream id #{stream_id} due to: #{inspect(reason)}" end) @@ -133,17 +133,17 @@ defmodule EventStore.Storage.Reader do {:ok, rows} {:error, %Postgrex.Error{postgres: %{message: message}}} -> - Logger.warn("Failed to read events from stream due to: " <> inspect(message)) + Logger.warning("Failed to read events from stream due to: " <> inspect(message)) {:error, message} {:error, %DBConnection.ConnectionError{message: message}} -> - Logger.warn("Failed to read events from stream due to: " <> inspect(message)) + Logger.warning("Failed to read events from stream due to: " <> inspect(message)) {:error, message} {:error, error} = reply -> - Logger.warn("Failed to read events from stream due to: " <> inspect(error)) + Logger.warning("Failed to read events from stream due to: " <> inspect(error)) reply end diff --git a/lib/event_store/storage/snapshot.ex b/lib/event_store/storage/snapshot.ex index 9364fcf8..b388ebc9 100644 --- a/lib/event_store/storage/snapshot.ex +++ b/lib/event_store/storage/snapshot.ex @@ -19,7 +19,7 @@ defmodule EventStore.Storage.Snapshot do {:ok, to_snapshot_from_row(row)} {:error, error} = reply -> - Logger.warn(fn -> + Logger.warning(fn -> "Failed to read snapshot for source \"#{source_uuid}\" due to: #{inspect(error)}" end) @@ -46,7 +46,7 @@ defmodule EventStore.Storage.Snapshot do :ok {:error, error} = reply -> - Logger.warn( + Logger.warning( "Failed to record snapshot for source \"#{source_uuid}\" at version \"#{source_version}\" due to: " <> inspect(error) ) @@ -65,7 +65,7 @@ defmodule EventStore.Storage.Snapshot do :ok {:error, error} = reply -> - Logger.warn( + Logger.warning( "Failed to delete snapshot for source \"#{source_uuid}\" due to: " <> inspect(error) ) diff --git a/lib/event_store/storage/subscription.ex b/lib/event_store/storage/subscription.ex index f14d4014..7082065d 100644 --- a/lib/event_store/storage/subscription.ex +++ b/lib/event_store/storage/subscription.ex @@ -117,7 +117,7 @@ defmodule EventStore.Storage.Subscription do {:error, :subscription_already_exists} {:error, error} = reply -> - Logger.warn( + Logger.warning( "Failed to create stream create subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\" due to: " <> inspect(error) ) @@ -140,7 +140,7 @@ defmodule EventStore.Storage.Subscription do :ok {:error, error} = reply -> - Logger.warn( + Logger.warning( "Failed to ack last seen event on stream \"#{stream_uuid}\" named \"#{subscription_name}\" due to: " <> inspect(error) ) @@ -171,7 +171,7 @@ defmodule EventStore.Storage.Subscription do :ok {:error, error} = reply -> - Logger.warn( + Logger.warning( "Failed to delete subscription to stream \"#{stream_uuid}\" named \"#{subscription_name}\" due to: " <> inspect(error) ) From 2233e3b2a8d14fc482453162f7f96163dac48923 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Tue, 17 Oct 2023 17:12:03 -0400 Subject: [PATCH 19/32] bump elixir version requirement --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index dd8d7c03..d509ba5b 100644 --- a/mix.exs +++ b/mix.exs @@ -8,7 +8,7 @@ defmodule EventStore.Mixfile do [ app: :eventstore, version: @version, - elixir: "~> 1.10", + elixir: "~> 1.11", elixirc_paths: elixirc_paths(Mix.env()), deps: deps(), description: description(), From 287fa870b14ce4123b50ab928ae2ce10b36af2ae Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 18 Oct 2023 10:49:47 +0100 Subject: [PATCH 20/32] Include #278 in CHANGELOG --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6768020d..a5e65c6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Next release + +Support Elixir v1.11 and later. + +### Enhancements + +* Fix Elixir `Logger.warn/2` warning deprecation message ([#278](https://github.com/commanded/eventstore/pull/278)). + ## v1.4.2 ### Enhancements From 6e2b8a1b41d2ca36afdaea38e20b9590a73d769b Mon Sep 17 00:00:00 2001 From: Ilya Suzdalnitski Date: Tue, 12 Sep 2023 00:26:23 +0000 Subject: [PATCH 21/32] allow configuring the default database --- guides/Getting Started.md | 8 ++++++++ lib/event_store/storage/database.ex | 8 ++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/guides/Getting Started.md b/guides/Getting Started.md index 923a5c60..4bb5e38d 100644 --- a/guides/Getting Started.md +++ b/guides/Getting Started.md @@ -41,6 +41,14 @@ EventStore is [available in Hex](https://hex.pm/packages/eventstore) and can be url: "postgres://postgres:postgres@localhost/eventstore" ``` + + **Note:** Some managed database providers (such as DigitalOcean) don't provide access to the default `postgres` database. In such case, you can specify a default database in the following way: + + ```elixir + config :my_app, MyApp.EventStore, + default_database: "defaultdb", + ``` + **Note:** To use an EventStore with Commanded you should configure the event store to use Commanded's JSON serializer which provides additional support for JSON decoding: diff --git a/lib/event_store/storage/database.ex b/lib/event_store/storage/database.ex index 6cdf8332..a7eabc0e 100644 --- a/lib/event_store/storage/database.ex +++ b/lib/event_store/storage/database.ex @@ -89,7 +89,9 @@ defmodule EventStore.Storage.Database do raise ":database is nil in repository configuration" encoding = opts[:encoding] || "UTF8" - opts = Keyword.put(opts, :database, "postgres") + + default_database = Keyword.get(opts, :default_database, "postgres") + opts = Keyword.put(opts, :database, default_database) command = ~s(CREATE DATABASE "#{database}" ENCODING '#{encoding}') @@ -117,7 +119,9 @@ defmodule EventStore.Storage.Database do Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" command = "DROP DATABASE \"#{database}\"" - opts = Keyword.put(opts, :database, "postgres") + + default_database = Keyword.get(opts, :default_database, "postgres") + opts = Keyword.put(opts, :database, default_database) case run_query(command, opts) do {:ok, _} -> From 64637adf08b13a0dccba5cfdebc690a1346b9b47 Mon Sep 17 00:00:00 2001 From: Fredrik Teschke Date: Tue, 27 Jun 2023 08:37:05 +0200 Subject: [PATCH 22/32] Parse url with encoded hash in password --- lib/event_store/config/parser.ex | 2 +- test/config_test.exs | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/event_store/config/parser.ex b/lib/event_store/config/parser.ex index 5ff8c3d9..70b16205 100644 --- a/lib/event_store/config/parser.ex +++ b/lib/event_store/config/parser.ex @@ -39,7 +39,7 @@ defmodule EventStore.Config.Parser do defp parse_url(""), do: [] defp parse_url(url) do - info = url |> URI.decode() |> URI.parse() + info = URI.parse(url) if is_nil(info.host) do raise ArgumentError, message: "host is not present" diff --git a/test/config_test.exs b/test/config_test.exs index e46eccf1..6d5e848c 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -110,6 +110,22 @@ defmodule EventStore.ConfigTest do ) end + test "parse url with encoded hash in password" do + config = [url: "postgres://username:password%23with_hash@localhost/database"] + + assert_parsed_config(config, + enable_hard_deletes: false, + column_data_type: "bytea", + schema: "public", + timeout: 15_000, + pool: EventStore.Config.get_pool(), + username: "username", + password: "password#with_hash", + database: "database", + hostname: "localhost" + ) + end + test "parse session_mode_url" do config = [session_mode_url: "postgres://username:password@localhost/database"] From 967176c9c91d3dedeedb5f5241f3e910a7db1f92 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 18 Oct 2023 11:07:48 +0100 Subject: [PATCH 23/32] Include #275 and #277 in CHANGELOG --- CHANGELOG.md | 2 ++ README.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5e65c6f..b642e8af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Support Elixir v1.11 and later. ### Enhancements +* Parse url with encoded hash in password ([#275](https://github.com/commanded/eventstore/pull/275)). +* Allow configuring the default database ([#277](https://github.com/commanded/eventstore/pull/277)). * Fix Elixir `Logger.warn/2` warning deprecation message ([#278](https://github.com/commanded/eventstore/pull/278)). ## v1.4.2 diff --git a/README.md b/README.md index a28f55b7..5ae772d6 100644 --- a/README.md +++ b/README.md @@ -263,6 +263,8 @@ EventStore exists thanks to the following people who have contributed. - [Douglas Vought](https://github.com/voughtdq) - [Eamon Taaffe](https://github.com/eamontaaffe) - [Floris Huetink](https://github.com/florish) +- [Fredrik Teschke](https://github.com/ftes) +- [Ilya Suzdalnitskiy](https://github.com/suzdalnitski) - [Jan Vereecken](https://github.com/javereec) - [Kai Kuchenbecker](https://github.com/kaikuchn) - [Kaz Walker](https://github.com/KazW) From e06ca62adbfd0874e40635e8442686d8002acaa8 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Fri, 10 Feb 2023 16:56:05 +0000 Subject: [PATCH 24/32] Quote schema names in SQL --- lib/event_store/sql/init.ex | 4 ++-- lib/event_store/sql/reset.ex | 4 ++-- .../sql/statements/advisory_unlock.sql.eex | 2 +- lib/event_store/sql/statements/count_streams.sql.eex | 2 +- lib/event_store/sql/statements/create_stream.sql.eex | 2 +- .../sql/statements/delete_snapshot.sql.eex | 2 +- .../sql/statements/delete_subscription.sql.eex | 2 +- .../sql/statements/hard_delete_stream.sql.eex | 8 ++++---- lib/event_store/sql/statements/insert_events.sql.eex | 12 ++++++------ .../sql/statements/insert_events_any_version.sql.eex | 12 ++++++------ .../sql/statements/insert_link_events.sql.eex | 8 ++++---- .../sql/statements/insert_snapshot.sql.eex | 2 +- .../sql/statements/insert_subscription.sql.eex | 2 +- .../sql/statements/query_all_subscriptions.sql.eex | 2 +- .../sql/statements/query_snapshot.sql.eex | 2 +- .../statements/query_stream_events_backward.sql.eex | 6 +++--- .../statements/query_stream_events_forward.sql.eex | 6 +++--- .../sql/statements/query_stream_info.sql.eex | 2 +- lib/event_store/sql/statements/query_streams.sql.eex | 2 +- .../sql/statements/query_subscription.sql.eex | 2 +- .../sql/statements/soft_delete_stream.sql.eex | 2 +- .../sql/statements/subscription_ack.sql.eex | 2 +- .../sql/statements/try_advisory_lock.sql.eex | 2 +- lib/event_store/storage/schema.ex | 4 ++-- lib/event_store/tasks/migrate.ex | 4 ++-- lib/event_store/tasks/migrations.ex | 2 +- 26 files changed, 50 insertions(+), 50 deletions(-) diff --git a/lib/event_store/sql/init.ex b/lib/event_store/sql/init.ex index e71bfbad..ab6d3a7b 100644 --- a/lib/event_store/sql/init.ex +++ b/lib/event_store/sql/init.ex @@ -8,7 +8,7 @@ defmodule EventStore.Sql.Init do schema = Keyword.fetch!(config, :schema) [ - "SET LOCAL search_path TO #{schema};", + ~s/SET LOCAL search_path TO "#{schema}";/, create_streams_table(), create_stream_uuid_index(), create_events_table(column_data_type), @@ -33,7 +33,7 @@ defmodule EventStore.Sql.Init do end defp create_streams_table do - """ + """ CREATE TABLE streams ( stream_id bigserial PRIMARY KEY NOT NULL, diff --git a/lib/event_store/sql/reset.ex b/lib/event_store/sql/reset.ex index b9dd3686..79f85cf5 100644 --- a/lib/event_store/sql/reset.ex +++ b/lib/event_store/sql/reset.ex @@ -7,8 +7,8 @@ defmodule EventStore.Sql.Reset do schema = Keyword.fetch!(config, :schema) [ - "SET LOCAL search_path TO #{schema};", - "SET LOCAL eventstore.reset TO 'on'", + ~s/SET LOCAL search_path TO "#{schema}";/, + ~s/SET LOCAL eventstore.reset TO 'on'/, truncate_tables(), seed_all_stream() ] diff --git a/lib/event_store/sql/statements/advisory_unlock.sql.eex b/lib/event_store/sql/statements/advisory_unlock.sql.eex index a9fb577c..18aa5bcd 100644 --- a/lib/event_store/sql/statements/advisory_unlock.sql.eex +++ b/lib/event_store/sql/statements/advisory_unlock.sql.eex @@ -1,4 +1,4 @@ SELECT pg_advisory_unlock( - '<%= schema %>.subscriptions'::regclass::oid::int, + '"<%= schema %>".subscriptions'::regclass::oid::int, (CASE WHEN $1 > 2147483647 THEN mod($1, 2147483647) ELSE $1 END)::int ); diff --git a/lib/event_store/sql/statements/count_streams.sql.eex b/lib/event_store/sql/statements/count_streams.sql.eex index 48f1d7df..b522b18d 100644 --- a/lib/event_store/sql/statements/count_streams.sql.eex +++ b/lib/event_store/sql/statements/count_streams.sql.eex @@ -1,3 +1,3 @@ SELECT COUNT(*) -FROM <%= schema %>.streams +FROM "<%= schema %>".streams WHERE $1::text IS NULL OR stream_uuid LIKE $1::text; diff --git a/lib/event_store/sql/statements/create_stream.sql.eex b/lib/event_store/sql/statements/create_stream.sql.eex index f66108f7..f242caa4 100644 --- a/lib/event_store/sql/statements/create_stream.sql.eex +++ b/lib/event_store/sql/statements/create_stream.sql.eex @@ -1,3 +1,3 @@ -INSERT INTO <%= schema %>.streams (stream_uuid) +INSERT INTO "<%= schema %>".streams (stream_uuid) VALUES ($1) RETURNING stream_id; diff --git a/lib/event_store/sql/statements/delete_snapshot.sql.eex b/lib/event_store/sql/statements/delete_snapshot.sql.eex index 288f1451..39a414f4 100644 --- a/lib/event_store/sql/statements/delete_snapshot.sql.eex +++ b/lib/event_store/sql/statements/delete_snapshot.sql.eex @@ -1,2 +1,2 @@ -DELETE FROM <%= schema %>.snapshots +DELETE FROM "<%= schema %>".snapshots WHERE source_uuid = $1; diff --git a/lib/event_store/sql/statements/delete_subscription.sql.eex b/lib/event_store/sql/statements/delete_subscription.sql.eex index b7250820..fa707def 100644 --- a/lib/event_store/sql/statements/delete_subscription.sql.eex +++ b/lib/event_store/sql/statements/delete_subscription.sql.eex @@ -1,2 +1,2 @@ -DELETE FROM <%= schema %>.subscriptions +DELETE FROM "<%= schema %>".subscriptions WHERE stream_uuid = $1 AND subscription_name = $2; diff --git a/lib/event_store/sql/statements/hard_delete_stream.sql.eex b/lib/event_store/sql/statements/hard_delete_stream.sql.eex index 232c42bf..22610b80 100644 --- a/lib/event_store/sql/statements/hard_delete_stream.sql.eex +++ b/lib/event_store/sql/statements/hard_delete_stream.sql.eex @@ -1,16 +1,16 @@ WITH deleted_stream_events AS ( - DELETE FROM <%= schema %>.stream_events + DELETE FROM "<%= schema %>".stream_events WHERE stream_id = $1 RETURNING event_id ), linked_events AS ( - DELETE FROM <%= schema %>.stream_events + DELETE FROM "<%= schema %>".stream_events WHERE event_id IN (SELECT event_id FROM deleted_stream_events) ), events AS ( - DELETE FROM <%= schema %>.events + DELETE FROM "<%= schema %>".events WHERE event_id IN (SELECT event_id FROM deleted_stream_events) ) -DELETE FROM <%= schema %>.streams +DELETE FROM "<%= schema %>".streams WHERE stream_id = $1 RETURNING stream_id; diff --git a/lib/event_store/sql/statements/insert_events.sql.eex b/lib/event_store/sql/statements/insert_events.sql.eex index c83a0629..49bcae15 100644 --- a/lib/event_store/sql/statements/insert_events.sql.eex +++ b/lib/event_store/sql/statements/insert_events.sql.eex @@ -7,7 +7,7 @@ WITH <% end %> ), events AS ( - INSERT INTO <%= schema %>.events + INSERT INTO "<%= schema %>".events ( event_id, event_type, @@ -25,18 +25,18 @@ WITH ), stream AS ( <%= if stream_id do %> - UPDATE <%= schema %>.streams + UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = $1::bigint returning stream_id <% else %> - INSERT INTO <%= schema %>.streams (stream_uuid, stream_version) + INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version) VALUES ($1, $2::bigint) returning stream_id <% end %> ), source_stream_events AS ( - INSERT INTO <%= schema %>.stream_events + INSERT INTO "<%= schema %>".stream_events ( event_id, stream_id, @@ -53,12 +53,12 @@ WITH FROM new_events_indexes, stream ), linked_stream AS ( - UPDATE <%= schema %>.streams + UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = 0 RETURNING stream_version - $2::bigint as initial_stream_version ) -INSERT INTO <%= schema %>.stream_events +INSERT INTO "<%= schema %>".stream_events ( event_id, stream_id, diff --git a/lib/event_store/sql/statements/insert_events_any_version.sql.eex b/lib/event_store/sql/statements/insert_events_any_version.sql.eex index 4334996a..7be918e9 100644 --- a/lib/event_store/sql/statements/insert_events_any_version.sql.eex +++ b/lib/event_store/sql/statements/insert_events_any_version.sql.eex @@ -1,12 +1,12 @@ WITH stream AS ( <%= if stream_id do %> - UPDATE <%= schema %>.streams + UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = $1::bigint returning stream_id, stream_version - $2::bigint as initial_stream_version <% else %> - INSERT INTO <%= schema %>.streams (stream_uuid, stream_version) + INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version) VALUES ($1, $2::bigint) returning stream_id, stream_version - $2::bigint as initial_stream_version <% end %> @@ -19,7 +19,7 @@ WITH <% end %> ), events AS ( - INSERT INTO <%= schema %>.events + INSERT INTO "<%= schema %>".events ( event_id, event_type, @@ -36,7 +36,7 @@ WITH <% end %> ), source_stream_events AS ( - INSERT INTO <%= schema %>.stream_events + INSERT INTO "<%= schema %>".stream_events ( event_id, stream_id, @@ -53,12 +53,12 @@ WITH FROM new_events_indexes, stream ), linked_stream AS ( - UPDATE <%= schema %>.streams + UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = 0 RETURNING stream_version - $2::bigint as initial_stream_version ) -INSERT INTO <%= schema %>.stream_events +INSERT INTO "<%= schema %>".stream_events ( event_id, stream_id, diff --git a/lib/event_store/sql/statements/insert_link_events.sql.eex b/lib/event_store/sql/statements/insert_link_events.sql.eex index b3e0d120..6a733d21 100644 --- a/lib/event_store/sql/statements/insert_link_events.sql.eex +++ b/lib/event_store/sql/statements/insert_link_events.sql.eex @@ -1,6 +1,6 @@ WITH stream AS ( - UPDATE <%= schema %>.streams + UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = $1::bigint RETURNING stream_version - $2 as initial_stream_version @@ -8,11 +8,11 @@ WITH linked_events (index, event_id) AS ( VALUES <%= for i <- 0..(number_of_events - 1) do %> - <%= unless i == 0 do %>,<% end %> + <%= unless i == 0 do %>,<% end %> ($<%= i * 2 + 3 %>::bigint, $<%= i * 2 + 4 %>::uuid) <% end %> ) -INSERT INTO <%= schema %>.stream_events +INSERT INTO "<%= schema %>".stream_events ( stream_id, stream_version, @@ -28,6 +28,6 @@ SELECT original_stream_events.stream_version FROM linked_events CROSS JOIN stream -INNER JOIN <%= schema %>.stream_events as original_stream_events +INNER JOIN "<%= schema %>".stream_events as original_stream_events ON original_stream_events.event_id = linked_events.event_id AND original_stream_events.stream_id = original_stream_events.original_stream_id; diff --git a/lib/event_store/sql/statements/insert_snapshot.sql.eex b/lib/event_store/sql/statements/insert_snapshot.sql.eex index 802307b6..036b9733 100644 --- a/lib/event_store/sql/statements/insert_snapshot.sql.eex +++ b/lib/event_store/sql/statements/insert_snapshot.sql.eex @@ -1,4 +1,4 @@ -INSERT INTO <%= schema %>.snapshots +INSERT INTO "<%= schema %>".snapshots ( source_uuid, source_version, diff --git a/lib/event_store/sql/statements/insert_subscription.sql.eex b/lib/event_store/sql/statements/insert_subscription.sql.eex index 0dc3f8af..41821ff1 100644 --- a/lib/event_store/sql/statements/insert_subscription.sql.eex +++ b/lib/event_store/sql/statements/insert_subscription.sql.eex @@ -1,4 +1,4 @@ -INSERT INTO <%= schema %>.subscriptions +INSERT INTO "<%= schema %>".subscriptions ( stream_uuid, subscription_name, diff --git a/lib/event_store/sql/statements/query_all_subscriptions.sql.eex b/lib/event_store/sql/statements/query_all_subscriptions.sql.eex index bebc3863..2259b167 100644 --- a/lib/event_store/sql/statements/query_all_subscriptions.sql.eex +++ b/lib/event_store/sql/statements/query_all_subscriptions.sql.eex @@ -4,5 +4,5 @@ SELECT subscription_name, last_seen, created_at -FROM <%= schema %>.subscriptions +FROM "<%= schema %>".subscriptions ORDER BY created_at; diff --git a/lib/event_store/sql/statements/query_snapshot.sql.eex b/lib/event_store/sql/statements/query_snapshot.sql.eex index bf9222ba..a6aae748 100644 --- a/lib/event_store/sql/statements/query_snapshot.sql.eex +++ b/lib/event_store/sql/statements/query_snapshot.sql.eex @@ -5,5 +5,5 @@ SELECT data, metadata, created_at -FROM <%= schema %>.snapshots +FROM "<%= schema %>".snapshots WHERE source_uuid = $1; diff --git a/lib/event_store/sql/statements/query_stream_events_backward.sql.eex b/lib/event_store/sql/statements/query_stream_events_backward.sql.eex index afbeff92..e107a1cd 100644 --- a/lib/event_store/sql/statements/query_stream_events_backward.sql.eex +++ b/lib/event_store/sql/statements/query_stream_events_backward.sql.eex @@ -9,9 +9,9 @@ SELECT e.data, e.metadata, e.created_at -FROM <%= schema %>.stream_events se -INNER JOIN <%= schema %>.streams s ON s.stream_id = se.original_stream_id -INNER JOIN <%= schema %>.events e ON se.event_id = e.event_id +FROM "<%= schema %>".stream_events se +INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id +INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id WHERE se.stream_id = $1 AND (se.stream_version <= $2 OR $2 = -1) ORDER BY diff --git a/lib/event_store/sql/statements/query_stream_events_forward.sql.eex b/lib/event_store/sql/statements/query_stream_events_forward.sql.eex index 82c071de..13d6236c 100644 --- a/lib/event_store/sql/statements/query_stream_events_forward.sql.eex +++ b/lib/event_store/sql/statements/query_stream_events_forward.sql.eex @@ -9,9 +9,9 @@ SELECT e.data, e.metadata, e.created_at -FROM <%= schema %>.stream_events se -INNER JOIN <%= schema %>.streams s ON s.stream_id = se.original_stream_id -INNER JOIN <%= schema %>.events e ON se.event_id = e.event_id +FROM "<%= schema %>".stream_events se +INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id +INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id WHERE se.stream_id = $1 AND se.stream_version >= $2 ORDER BY diff --git a/lib/event_store/sql/statements/query_stream_info.sql.eex b/lib/event_store/sql/statements/query_stream_info.sql.eex index 0c3e0e4b..b5d74b3d 100644 --- a/lib/event_store/sql/statements/query_stream_info.sql.eex +++ b/lib/event_store/sql/statements/query_stream_info.sql.eex @@ -4,5 +4,5 @@ SELECT stream_version, created_at, deleted_at -FROM <%= schema %>.streams +FROM "<%= schema %>".streams WHERE stream_uuid = $1; diff --git a/lib/event_store/sql/statements/query_streams.sql.eex b/lib/event_store/sql/statements/query_streams.sql.eex index e36ab750..87e02924 100644 --- a/lib/event_store/sql/statements/query_streams.sql.eex +++ b/lib/event_store/sql/statements/query_streams.sql.eex @@ -4,7 +4,7 @@ SELECT stream_version, created_at, deleted_at -FROM <%= schema %>.streams +FROM "<%= schema %>".streams WHERE $1::text IS NULL OR stream_uuid LIKE $1::text ORDER BY <%= sort_by %> <%= sort_dir %> LIMIT $2 OFFSET $3; diff --git a/lib/event_store/sql/statements/query_subscription.sql.eex b/lib/event_store/sql/statements/query_subscription.sql.eex index 73cd39fe..116a8e21 100644 --- a/lib/event_store/sql/statements/query_subscription.sql.eex +++ b/lib/event_store/sql/statements/query_subscription.sql.eex @@ -4,5 +4,5 @@ SELECT subscription_name, last_seen, created_at -FROM <%= schema %>.subscriptions +FROM "<%= schema %>".subscriptions WHERE stream_uuid = $1 AND subscription_name = $2; diff --git a/lib/event_store/sql/statements/soft_delete_stream.sql.eex b/lib/event_store/sql/statements/soft_delete_stream.sql.eex index 7b8a5be6..e0a806c9 100644 --- a/lib/event_store/sql/statements/soft_delete_stream.sql.eex +++ b/lib/event_store/sql/statements/soft_delete_stream.sql.eex @@ -1,3 +1,3 @@ -UPDATE <%= schema %>.streams +UPDATE "<%= schema %>".streams SET deleted_at = NOW() WHERE stream_id = $1; diff --git a/lib/event_store/sql/statements/subscription_ack.sql.eex b/lib/event_store/sql/statements/subscription_ack.sql.eex index 482e54e9..8b4e8cc3 100644 --- a/lib/event_store/sql/statements/subscription_ack.sql.eex +++ b/lib/event_store/sql/statements/subscription_ack.sql.eex @@ -1,3 +1,3 @@ -UPDATE <%= schema %>.subscriptions +UPDATE "<%= schema %>".subscriptions SET last_seen = $3 WHERE stream_uuid = $1 AND subscription_name = $2; diff --git a/lib/event_store/sql/statements/try_advisory_lock.sql.eex b/lib/event_store/sql/statements/try_advisory_lock.sql.eex index 6e906f22..fac1ab95 100644 --- a/lib/event_store/sql/statements/try_advisory_lock.sql.eex +++ b/lib/event_store/sql/statements/try_advisory_lock.sql.eex @@ -1,4 +1,4 @@ SELECT pg_try_advisory_lock( - '<%= schema %>.subscriptions'::regclass::oid::int, + '"<%= schema %>".subscriptions'::regclass::oid::int, (CASE WHEN $1 > 2147483647 THEN mod($1, 2147483647) ELSE $1 END)::int ); diff --git a/lib/event_store/storage/schema.ex b/lib/event_store/storage/schema.ex index bb1eb957..28bbec7f 100644 --- a/lib/event_store/storage/schema.ex +++ b/lib/event_store/storage/schema.ex @@ -6,7 +6,7 @@ defmodule EventStore.Storage.Schema do def create(config) do schema = Keyword.fetch!(config, :schema) - case Database.execute(config, "CREATE SCHEMA #{schema}") do + case Database.execute(config, ~s/CREATE SCHEMA "#{schema}"/) do :ok -> :ok @@ -21,7 +21,7 @@ defmodule EventStore.Storage.Schema do def drop(config) do schema = Keyword.fetch!(config, :schema) - case Database.execute(config, "DROP SCHEMA #{schema} CASCADE;") do + case Database.execute(config, ~s/DROP SCHEMA "#{schema}" CASCADE;/) do :ok -> :ok diff --git a/lib/event_store/tasks/migrate.ex b/lib/event_store/tasks/migrate.ex index 6b767fcc..5d511b28 100644 --- a/lib/event_store/tasks/migrate.ex +++ b/lib/event_store/tasks/migrate.ex @@ -94,7 +94,7 @@ defmodule EventStore.Tasks.Migrate do path = Application.app_dir(:eventstore, "priv/event_store/migrations/v#{migration}.sql") script = File.read!(path) - statements = ["SET LOCAL search_path TO #{schema}; ", script] + statements = [~s/SET LOCAL search_path TO "#{schema}"; /, script] case transaction(conn, statements) do {:ok, :ok} -> @@ -129,7 +129,7 @@ defmodule EventStore.Tasks.Migrate do conn, """ SELECT major_version, minor_version, patch_version - FROM #{schema}.schema_migrations + FROM "#{schema}".schema_migrations """, [] ) diff --git a/lib/event_store/tasks/migrations.ex b/lib/event_store/tasks/migrations.ex index 870a3e47..8b3dbc19 100644 --- a/lib/event_store/tasks/migrations.ex +++ b/lib/event_store/tasks/migrations.ex @@ -96,7 +96,7 @@ defmodule EventStore.Tasks.Migrations do config |> run_query(""" SELECT major_version, minor_version, patch_version, migrated_at - FROM #{schema}.schema_migrations + FROM "#{schema}".schema_migrations ORDER BY 1, 2, 3 """) |> handle_response() From 203562eafafca82bd076a1dc3273eb84f4196104 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 8 Jan 2024 17:09:34 +0000 Subject: [PATCH 25/32] Include ##266 in CHANGELOG --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b642e8af..a1f29ac6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,13 @@ ## Next release -Support Elixir v1.11 and later. - ### Enhancements +* Support Elixir v1.11 and later. * Parse url with encoded hash in password ([#275](https://github.com/commanded/eventstore/pull/275)). * Allow configuring the default database ([#277](https://github.com/commanded/eventstore/pull/277)). * Fix Elixir `Logger.warn/2` warning deprecation message ([#278](https://github.com/commanded/eventstore/pull/278)). +* Quote schema names in SQL ([#266](https://github.com/commanded/eventstore/pull/266)). ## v1.4.2 From c8df500ffb68239571f5efd1def945407c94a82d Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 8 Jan 2024 17:10:24 +0000 Subject: [PATCH 26/32] Update Mix dependencies --- mix.exs | 2 +- mix.lock | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/mix.exs b/mix.exs index d509ba5b..eecc1d07 100644 --- a/mix.exs +++ b/mix.exs @@ -49,7 +49,7 @@ defmodule EventStore.Mixfile do # Development and test tooling {:benchfella, "~> 0.3", only: :bench}, - {:dialyxir, "~> 1.3", only: :dev, runtime: false}, + {:dialyxir, "~> 1.4", only: :dev, runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev} ] end diff --git a/mix.lock b/mix.lock index 707ce61a..2757add0 100644 --- a/mix.lock +++ b/mix.lock @@ -1,20 +1,20 @@ %{ "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, - "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, + "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, - "dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.32", "fa739a0ecfa34493de19426681b23f6814573faee95dfd4b4aafe15a7b5b32c6", [:mix], [], "hexpm", "b8b0dd77d60373e77a3d7e8afa598f325e49e8663a51bcc2b88ef41838cca755"}, + "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"}, + "ex_doc": {:hex, :ex_doc, "0.31.0", "06eb1dfd787445d9cab9a45088405593dd3bb7fe99e097eaa71f37ba80c7a676", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5350cafa6b7f77bdd107aa2199fe277acf29d739aba5aee7e865fc680c62a110"}, "fsm": {:hex, :fsm, "0.3.1", "087aa9b02779a84320dc7a2d8464452b5308e29877921b2bde81cdba32a12390", [:mix], [], "hexpm", "fbf0d53f89e9082b326b0b5828b94b4c549ff9d1452bbfd00b4d1ac082208e96"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, - "jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"}, - "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, - "postgrex": {:hex, :postgrex, "0.17.1", "01c29fd1205940ee55f7addb8f1dc25618ca63a8817e56fac4f6846fc2cddcbe", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "14b057b488e73be2beee508fb1955d8db90d6485c6466428fe9ccf1d6692a555"}, + "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, } From f475d36d5acc19290cfe60a4b45f8c75d123c26e Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 8 Jan 2024 17:16:09 +0000 Subject: [PATCH 27/32] Upgrade to latest Erlang and Elixir versions In `.tool-versions` and GitHub Actions workflow. --- .github/workflows/test.yml | 4 ++-- config/test.exs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 59fe6f55..8d98f475 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,8 +8,8 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - otp: ['24.3', '25.3'] - elixir: ['1.14.5'] + otp: ['25.3', '26.2'] + elixir: ['1.16.0'] services: postgres: diff --git a/config/test.exs b/config/test.exs index 8a72974e..c920f670 100644 --- a/config/test.exs +++ b/config/test.exs @@ -3,7 +3,7 @@ import Config config :logger, backends: [] config :ex_unit, - capture_log: [level: :warn], + capture_log: [level: :warning], assert_receive_timeout: 2_000, refute_receive_timeout: 100 From 6f385a56bb7c3dd2cdb7adb43e6fb6e038764f65 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 8 Jan 2024 17:43:28 +0000 Subject: [PATCH 28/32] Use `~s(...)` sigil instead of `~s/../` --- lib/event_store/sql/init.ex | 4 +-- lib/event_store/sql/reset.ex | 4 +-- lib/event_store/storage/schema.ex | 4 +-- lib/event_store/tasks/migrate.ex | 5 +++- test/streams/single_stream_test.exs | 25 +++++++++++++++---- .../concurrent_subscription_test.exs | 8 ++++-- .../support/stream_subscription_test_case.ex | 15 ++++++++--- 7 files changed, 48 insertions(+), 17 deletions(-) diff --git a/lib/event_store/sql/init.ex b/lib/event_store/sql/init.ex index ab6d3a7b..ea529782 100644 --- a/lib/event_store/sql/init.ex +++ b/lib/event_store/sql/init.ex @@ -8,7 +8,7 @@ defmodule EventStore.Sql.Init do schema = Keyword.fetch!(config, :schema) [ - ~s/SET LOCAL search_path TO "#{schema}";/, + ~s(SET LOCAL search_path TO "#{schema}";), create_streams_table(), create_stream_uuid_index(), create_events_table(column_data_type), @@ -33,7 +33,7 @@ defmodule EventStore.Sql.Init do end defp create_streams_table do - """ + """ CREATE TABLE streams ( stream_id bigserial PRIMARY KEY NOT NULL, diff --git a/lib/event_store/sql/reset.ex b/lib/event_store/sql/reset.ex index 79f85cf5..393b0b7e 100644 --- a/lib/event_store/sql/reset.ex +++ b/lib/event_store/sql/reset.ex @@ -7,8 +7,8 @@ defmodule EventStore.Sql.Reset do schema = Keyword.fetch!(config, :schema) [ - ~s/SET LOCAL search_path TO "#{schema}";/, - ~s/SET LOCAL eventstore.reset TO 'on'/, + ~s(SET LOCAL search_path TO "#{schema}";), + ~s(SET LOCAL eventstore.reset TO 'on';), truncate_tables(), seed_all_stream() ] diff --git a/lib/event_store/storage/schema.ex b/lib/event_store/storage/schema.ex index 28bbec7f..ae463a2e 100644 --- a/lib/event_store/storage/schema.ex +++ b/lib/event_store/storage/schema.ex @@ -6,7 +6,7 @@ defmodule EventStore.Storage.Schema do def create(config) do schema = Keyword.fetch!(config, :schema) - case Database.execute(config, ~s/CREATE SCHEMA "#{schema}"/) do + case Database.execute(config, ~s(CREATE SCHEMA "#{schema}")) do :ok -> :ok @@ -21,7 +21,7 @@ defmodule EventStore.Storage.Schema do def drop(config) do schema = Keyword.fetch!(config, :schema) - case Database.execute(config, ~s/DROP SCHEMA "#{schema}" CASCADE;/) do + case Database.execute(config, ~s(DROP SCHEMA "#{schema}" CASCADE;)) do :ok -> :ok diff --git a/lib/event_store/tasks/migrate.ex b/lib/event_store/tasks/migrate.ex index 5d511b28..b873ccfe 100644 --- a/lib/event_store/tasks/migrate.ex +++ b/lib/event_store/tasks/migrate.ex @@ -94,7 +94,10 @@ defmodule EventStore.Tasks.Migrate do path = Application.app_dir(:eventstore, "priv/event_store/migrations/v#{migration}.sql") script = File.read!(path) - statements = [~s/SET LOCAL search_path TO "#{schema}"; /, script] + statements = [ + ~s(SET LOCAL search_path TO "#{schema}";), + script + ] case transaction(conn, statements) do {:ok, :ok} -> diff --git a/test/streams/single_stream_test.exs b/test/streams/single_stream_test.exs index 19b3c663..8789abe9 100644 --- a/test/streams/single_stream_test.exs +++ b/test/streams/single_stream_test.exs @@ -34,7 +34,10 @@ defmodule EventStore.Streams.SingleStreamTest do utc_now = DateTime.utc_now() {:ok, [event]} = - Stream.read_stream_forward(conn, stream_uuid, 0, 1, schema: schema, serializer: serializer) + Stream.read_stream_forward(conn, stream_uuid, 0, 1, + schema: schema, + serializer: serializer + ) created_at = event.created_at assert created_at != nil @@ -509,14 +512,20 @@ defmodule EventStore.Streams.SingleStreamTest do events = EventFactory.create_events(3) :ok = - Stream.append_to_stream(conn, stream_uuid, 0, events, schema: schema, serializer: serializer) + Stream.append_to_stream(conn, stream_uuid, 0, events, + schema: schema, + serializer: serializer + ) # stream above needed for preventing accidental event_number/stream_version match stream_uuid = UUID.uuid4() events = EventFactory.create_events(3) :ok = - Stream.append_to_stream(conn, stream_uuid, 0, events, schema: schema, serializer: serializer) + Stream.append_to_stream(conn, stream_uuid, 0, events, + schema: schema, + serializer: serializer + ) assert {:ok, 3} = Stream.stream_version(conn, stream_uuid, schema: schema) end @@ -528,7 +537,10 @@ defmodule EventStore.Streams.SingleStreamTest do events = EventFactory.create_events(3) :ok = - Stream.append_to_stream(conn, stream_uuid, 0, events, schema: schema, serializer: serializer) + Stream.append_to_stream(conn, stream_uuid, 0, events, + schema: schema, + serializer: serializer + ) [ stream_uuid: stream_uuid, @@ -543,7 +555,10 @@ defmodule EventStore.Streams.SingleStreamTest do events = EventFactory.create_events(1) :ok = - Stream.append_to_stream(conn, stream_uuid, 0, events, schema: schema, serializer: serializer) + Stream.append_to_stream(conn, stream_uuid, 0, events, + schema: schema, + serializer: serializer + ) [ other_stream_uuid: stream_uuid, diff --git a/test/subscriptions/concurrent_subscription_test.exs b/test/subscriptions/concurrent_subscription_test.exs index c80a1e68..da48f93b 100644 --- a/test/subscriptions/concurrent_subscription_test.exs +++ b/test/subscriptions/concurrent_subscription_test.exs @@ -108,10 +108,14 @@ defmodule EventStore.Subscriptions.ConcurrentSubscriptionTest do subscription_name = UUID.uuid4() assert {:ok, _subscription} = - EventStore.subscribe_to_all_streams(subscription_name, self(), concurrency_limit: 2) + EventStore.subscribe_to_all_streams(subscription_name, self(), + concurrency_limit: 2 + ) assert {:error, :already_subscribed} = - EventStore.subscribe_to_all_streams(subscription_name, self(), concurrency_limit: 2) + EventStore.subscribe_to_all_streams(subscription_name, self(), + concurrency_limit: 2 + ) end test "should send events to all subscribers" do diff --git a/test/subscriptions/support/stream_subscription_test_case.ex b/test/subscriptions/support/stream_subscription_test_case.ex index 0f081082..8f7fee9f 100644 --- a/test/subscriptions/support/stream_subscription_test_case.ex +++ b/test/subscriptions/support/stream_subscription_test_case.ex @@ -238,19 +238,28 @@ defmodule EventStore.Subscriptions.StreamSubscriptionTestCase do # Start receiving remaining events acknowledge received events subscription = - assert_ack(subscription, subscriber, event1, expected_last_sent: 4, expected_last_ack: 1) + assert_ack(subscription, subscriber, event1, + expected_last_sent: 4, + expected_last_ack: 1 + ) assert_receive {:events, [received_event], ^subscriber} assert_event(Enum.at(events, 3), received_event) subscription = - assert_ack(subscription, subscriber, event2, expected_last_sent: 5, expected_last_ack: 2) + assert_ack(subscription, subscriber, event2, + expected_last_sent: 5, + expected_last_ack: 2 + ) assert_receive {:events, [received_event], ^subscriber} assert_event(Enum.at(events, 4), received_event) subscription = - assert_ack(subscription, subscriber, event3, expected_last_sent: 6, expected_last_ack: 3) + assert_ack(subscription, subscriber, event3, + expected_last_sent: 6, + expected_last_ack: 3 + ) assert_receive {:events, [received_event], ^subscriber} assert_event(Enum.at(events, 5), received_event) From fba60eb462e4602f4bafecca437bda99df2a2288 Mon Sep 17 00:00:00 2001 From: dvic Date: Fri, 3 Nov 2023 20:30:52 +0100 Subject: [PATCH 29/32] Fix unused read batch size --- lib/event_store.ex | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/event_store.ex b/lib/event_store.ex index c571254a..a7b63b5f 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -533,6 +533,12 @@ defmodule EventStore do conn = Keyword.get(opts, :conn) || Keyword.fetch!(config, :conn) timeout = timeout(opts, config) + config = + case Keyword.fetch(opts, :read_batch_size) do + {:ok, read_batch_size} -> Keyword.put(config, :read_batch_size, read_batch_size) + :error -> config + end + {conn, Keyword.put(config, :timeout, timeout)} end From 8a17b67b11b9590c6611be5da72014f0c7e1e21d Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 8 Jan 2024 17:54:36 +0000 Subject: [PATCH 30/32] Include #279 in CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1f29ac6..9db4a9ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * Allow configuring the default database ([#277](https://github.com/commanded/eventstore/pull/277)). * Fix Elixir `Logger.warn/2` warning deprecation message ([#278](https://github.com/commanded/eventstore/pull/278)). * Quote schema names in SQL ([#266](https://github.com/commanded/eventstore/pull/266)). +* Fix unused read batch size ([#279](https://github.com/commanded/eventstore/pull/279)). ## v1.4.2 From e952c64cb3185d35f97710bbd62de31e67fe299d Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 8 Jan 2024 18:04:13 +0000 Subject: [PATCH 31/32] Release v1.4.3 --- CHANGELOG.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9db4a9ac..2f28dfc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## Next release +## v1.4.3 ### Enhancements diff --git a/mix.exs b/mix.exs index eecc1d07..cdbc0be8 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule EventStore.Mixfile do use Mix.Project @source_url "https://github.com/commanded/eventstore" - @version "1.4.2" + @version "1.4.3" def project do [ From b0a5e022d2d45af9dcea08ffef721a66569c4ca8 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Tue, 9 Jan 2024 12:33:09 +0000 Subject: [PATCH 32/32] Event appender returns all errors to the caller --- lib/event_store/storage/appender.ex | 58 ++++++++++++++--------------- test/storage/append_events_test.exs | 23 ++++++++---- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index 97299d15..ffd6b5c2 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -1,11 +1,10 @@ defmodule EventStore.Storage.Appender do @moduledoc false - - require Logger - alias EventStore.{RecordedEvent, UUID} alias EventStore.Sql.Statements + require Logger + @doc """ Append the given list of events to storage. @@ -15,7 +14,7 @@ defmodule EventStore.Storage.Appender do Returns `:ok` on success, `{:error, reason}` on failure. """ def append(conn, stream_id, events, opts) do - stream_uuid = stream_uuid(events) + [%RecordedEvent{stream_uuid: stream_uuid} | _] = events try do events @@ -108,11 +107,13 @@ defmodule EventStore.Storage.Appender do end stream_id_or_uuid = stream_id || stream_uuid - parameters = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events) + params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events) - conn - |> Postgrex.query(statement, parameters, opts) - |> handle_response() + case Postgrex.query(conn, statement, params, opts) do + {:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found} + {:ok, %Postgrex.Result{}} -> :ok + {:error, error} -> handle_error(error) + end end defp build_insert_parameters(events) do @@ -147,43 +148,40 @@ defmodule EventStore.Storage.Appender do defp insert_link_events(conn, params, event_count, schema, opts) do statement = Statements.insert_link_events(schema, event_count) - conn - |> Postgrex.query(statement, params, opts) - |> handle_response() + case Postgrex.query(conn, statement, params, opts) do + {:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found} + {:ok, %Postgrex.Result{}} -> :ok + {:error, error} -> handle_error(error) + end end - defp handle_response({:ok, %Postgrex.Result{num_rows: 0}}), do: {:error, :not_found} - defp handle_response({:ok, %Postgrex.Result{}}), do: :ok + defp handle_error(%Postgrex.Error{} = error) do + %Postgrex.Error{postgres: postgres} = error - defp handle_response({:error, %Postgrex.Error{} = error}) do - %Postgrex.Error{postgres: %{code: error_code, constraint: constraint}} = error - - case {error_code, constraint} do - {:foreign_key_violation, _constraint} -> + case postgres do + %{code: :foreign_key_violation} -> {:error, :not_found} - {:unique_violation, "events_pkey"} -> + %{code: :unique_violation, constraint: "events_pkey"} -> {:error, :duplicate_event} - {:unique_violation, "stream_events_pkey"} -> + %{code: :unique_violation, constraint: "stream_events_pkey"} -> {:error, :duplicate_event} - {:unique_violation, "ix_streams_stream_uuid"} -> - # EventStore.Streams.Stream will retry when it gets this - # error code. That will always work because on the second - # time around, the stream will have been made, so the - # race to create the stream will have been resolved. + %{code: :unique_violation, constraint: "ix_streams_stream_uuid"} -> + # EventStore.Streams.Stream will retry when it gets this error code. That will always work + # because on the second time around, the stream will have been made, so the race to create + # the stream will have been resolved. {:error, :duplicate_stream_uuid} - {:unique_violation, _constraint} -> + %{code: :unique_violation} -> {:error, :wrong_expected_version} - {error_code, _constraint} -> + %{code: error_code} -> {:error, error_code} end end - defp handle_response({:error, %DBConnection.ConnectionError{}} = reply), do: reply - - defp stream_uuid([event | _]), do: event.stream_uuid + # Return all other errors to the caller + defp handle_error(error), do: {:error, error} end diff --git a/test/storage/append_events_test.exs b/test/storage/append_events_test.exs index 0ae2198f..e30d65af 100644 --- a/test/storage/append_events_test.exs +++ b/test/storage/append_events_test.exs @@ -4,13 +4,6 @@ defmodule EventStore.Storage.AppendEventsTest do alias EventStore.{EventFactory, RecordedEvent, UUID} alias EventStore.Storage.{Appender, CreateStream} - test "append single event with a db connection error" do - conn = start_supervised!({Postgrex, TestEventStore.config(queue_timeout: -1, queue_target: -1)}) |> dbg() - recorded_events = EventFactory.create_recorded_events(1, UUID.uuid4()) - assert {:error, %DBConnection.ConnectionError{}} = - Appender.append(conn, 1, recorded_events, schema: "public") - end - test "append single event to new stream", %{conn: conn, schema: schema} = context do {:ok, stream_uuid, stream_id} = create_stream(context) recorded_events = EventFactory.create_recorded_events(1, stream_uuid) @@ -212,6 +205,22 @@ defmodule EventStore.Storage.AppendEventsTest do end end + test "append event to schema which does not exist", %{conn: conn} do + recorded_events = EventFactory.create_recorded_events(1, UUID.uuid4()) + + assert {:error, :undefined_table} = + Appender.append(conn, 1, recorded_events, schema: "doesnotexist") + end + + test "append single event with a db connection error", %{conn: conn, schema: schema} do + recorded_events = EventFactory.create_recorded_events(1, UUID.uuid4()) + + # Using Postgrex query timeout value of zero will cause a `DBConnection.ConnectionError` error + # to be returned. + assert {:error, %DBConnection.ConnectionError{}} = + Appender.append(conn, 1, recorded_events, schema: schema, timeout: 0) + end + defp create_stream(context) do %{conn: conn, schema: schema} = context