Skip to content

Commit

Permalink
Refactored AutoRefresh to use the new PGListener interface
Browse files Browse the repository at this point in the history
The new PGListener API is less resource hungry: Previous AutoRefresh was taking one database connection per watched table from the database pool. With PGListener it's only ever using one database connection from the database pool.
  • Loading branch information
mpscholten committed Dec 5, 2021
1 parent 78d5cfb commit 420890a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 82 deletions.
38 changes: 32 additions & 6 deletions IHP/AutoRefresh.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import qualified Control.Concurrent.MVar as MVar
import qualified Data.Maybe as Maybe
import qualified Data.Text as Text
import IHP.WebSocket
import qualified IHP.PGNotify as PGNotify
import IHP.Controller.Context
import qualified IHP.PGListener as PGListener
import qualified Database.PostgreSQL.Simple.Types as PG

initAutoRefresh :: (?context :: ControllerContext, ?applicationContext :: ApplicationContext) => IO ()
initAutoRefresh = do
Expand Down Expand Up @@ -156,7 +157,13 @@ registerNotificationTrigger touchedTablesVar autoRefreshServer = do

let subscriptionRequired = touchedTables |> filter (\table -> subscribedTables |> Set.notMember table)
modifyIORef autoRefreshServer (\server -> server { subscribedTables = get #subscribedTables server <> Set.fromList subscriptionRequired })
subscriptions <- subscriptionRequired |> mapM (\table -> PGNotify.watchInsertOrUpdateTable table do

pgListener <- get #pgListener <$> readIORef autoRefreshServer
subscriptions <- subscriptionRequired |> mapM (\table -> do
let createTriggerSql = notificationTrigger table
sqlExec createTriggerSql ()

pgListener |> PGListener.subscribe (channelName table) \notification -> do
sessions <- (get #sessions) <$> readIORef autoRefreshServer
sessions
|> filter (\session -> table `Set.member` (get #tables session))
Expand Down Expand Up @@ -213,7 +220,26 @@ gcSessions autoRefreshServer = do
isSessionExpired :: UTCTime -> AutoRefreshSession -> Bool
isSessionExpired now AutoRefreshSession { lastPing } = (now `diffUTCTime` lastPing) > (secondsToNominalDiffTime 60)

-- | Stops all async Auto Refresh subscriptions
stopAutoRefreshServer :: IORef AutoRefreshServer -> IO ()
stopAutoRefreshServer autoRefreshServer =
readIORef autoRefreshServer >>= (\autoRefreshServer -> autoRefreshServer |> get #subscriptions |> mapM_ uninterruptibleCancel)
-- | Returns the event name of the event that the pg notify trigger dispatches
channelName :: ByteString -> ByteString
channelName tableName = "did_change_" <> tableName

-- | Returns the sql code to set up a database trigger
notificationTrigger :: ByteString -> PG.Query
notificationTrigger tableName = PG.Query $ ""
<> "BEGIN;\n"
<> "CREATE OR REPLACE FUNCTION " <> functionName <> "() RETURNS TRIGGER AS $$"
<> "BEGIN\n"
<> " PERFORM pg_notify('" <> channelName tableName <> "', '');\n"
<> " RETURN new;"
<> "END;\n"
<> "$$ language plpgsql;"
<> "DROP TRIGGER IF EXISTS " <> insertTriggerName <> " ON " <> tableName <> "; CREATE TRIGGER " <> insertTriggerName <> " AFTER INSERT ON \"" <> tableName <> "\" FOR EACH STATEMENT EXECUTE PROCEDURE " <> functionName <> "();\n"
<> "DROP TRIGGER IF EXISTS " <> updateTriggerName <> " ON " <> tableName <> "; CREATE TRIGGER " <> updateTriggerName <> " AFTER UPDATE ON \"" <> tableName <> "\" FOR EACH STATEMENT EXECUTE PROCEDURE " <> functionName <> "();\n"
<> "DROP TRIGGER IF EXISTS " <> deleteTriggerName <> " ON " <> tableName <> "; CREATE TRIGGER " <> deleteTriggerName <> " AFTER DELETE ON \"" <> tableName <> "\" FOR EACH STATEMENT EXECUTE PROCEDURE " <> functionName <> "();\n"
<> "COMMIT;\n"
where
functionName = "notify_did_change_" <> tableName
insertTriggerName = "did_insert_" <> tableName
updateTriggerName = "did_update_" <> tableName
deleteTriggerName = "did_delete_" <> tableName
12 changes: 9 additions & 3 deletions IHP/AutoRefresh/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module IHP.AutoRefresh.Types where
import IHP.Prelude
import IHP.Controller.RequestContext
import Control.Concurrent.MVar (MVar)
import qualified IHP.PGListener as PGListener

data AutoRefreshState = AutoRefreshDisabled | AutoRefreshEnabled { sessionId :: !UUID }
data AutoRefreshSession = AutoRefreshSession
Expand All @@ -24,7 +25,12 @@ data AutoRefreshSession = AutoRefreshSession
, lastPing :: !UTCTime
}

data AutoRefreshServer = AutoRefreshServer { subscriptions :: [Async ()], sessions :: ![AutoRefreshSession], subscribedTables :: !(Set ByteString) }
data AutoRefreshServer = AutoRefreshServer
{ subscriptions :: [PGListener.Subscription]
, sessions :: ![AutoRefreshSession]
, subscribedTables :: !(Set ByteString)
, pgListener :: PGListener.PGListener
}

newAutoRefreshServer :: AutoRefreshServer
newAutoRefreshServer = AutoRefreshServer { subscriptions = [], sessions = [], subscribedTables = mempty }
newAutoRefreshServer :: PGListener.PGListener -> AutoRefreshServer
newAutoRefreshServer pgListener = AutoRefreshServer { subscriptions = [], sessions = [], subscribedTables = mempty, pgListener }
70 changes: 0 additions & 70 deletions IHP/PGNotify.hs

This file was deleted.

4 changes: 2 additions & 2 deletions IHP/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ run configBuilder = do
frameworkConfig <- buildFrameworkConfig configBuilder

sessionVault <- Vault.newKey
autoRefreshServer <- newIORef AutoRefresh.newAutoRefreshServer
modelContext <- initModelContext frameworkConfig
pgListener <- PGListener.init modelContext
autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener)

let ?modelContext = modelContext
let ?applicationContext = ApplicationContext { modelContext = ?modelContext, session = sessionVault, autoRefreshServer, frameworkConfig, pgListener }
Expand All @@ -63,7 +63,7 @@ run configBuilder = do

run `finally` do
frameworkConfig |> get #logger |> get #cleanup
AutoRefresh.stopAutoRefreshServer autoRefreshServer
PGListener.stop pgListener

{-# INLINABLE run #-}

Expand Down
1 change: 0 additions & 1 deletion ihp.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ library
, IHP.AutoRefresh.Types
, IHP.AutoRefresh.View
, IHP.WebSocket
, IHP.PGNotify
, IHP.Mail
, IHP.Mail.Types
, IHP.MailPrelude
Expand Down

0 comments on commit 420890a

Please sign in to comment.