Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ntf: smp server to replace last undelivered notifications #1351

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@
msgNtfNoSub' <- atomicSwapIORef (msgNtfNoSub ss) 0
msgNtfLost' <- atomicSwapIORef (msgNtfLost ss) 0
msgNtfExpired' <- atomicSwapIORef (msgNtfExpired ss) 0
msgNtfReplaced' <- atomicSwapIORef (msgNtfReplaced ss) 0
pRelays' <- getResetProxyStatsData pRelays
pRelaysOwn' <- getResetProxyStatsData pRelaysOwn
pMsgFwds' <- getResetProxyStatsData pMsgFwds
Expand Down Expand Up @@ -470,7 +471,8 @@
show ntfDeletedB',
show ntfSubB',
show msgNtfsB',
show msgNtfExpired'
show msgNtfExpired',
show msgNtfReplaced'
]
)
liftIO $ threadDelay' interval
Expand Down Expand Up @@ -583,6 +585,7 @@
putStat "msgNtfs" msgNtfs
putStat "msgNtfsB" msgNtfsB
putStat "msgNtfExpired" msgNtfExpired
putStat "msgNtfReplaced" msgNtfReplaced
putStat "qCount" qCount
putStat "msgCount" msgCount
putProxyStat "pRelays" pRelays
Expand Down Expand Up @@ -951,7 +954,7 @@

-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
dummySignKey :: C.SignatureAlgorithm a => C.SAlgorithm a -> C.PublicKey a

Check warning on line 957 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a

Check warning on line 957 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a
dummySignKey = \case
C.SEd25519 -> dummyKeyEd25519
C.SEd448 -> dummyKeyEd448
Expand Down Expand Up @@ -1455,10 +1458,11 @@
enqueueNotification :: NtfCreds -> Message -> M ()
enqueueNotification _ MessageQuota {} = pure ()
enqueueNotification NtfCreds {notifierId = nId, rcvNtfDhSecret} Message {msgId, msgTs} = do
-- stats <- asks serverStats
ns <- asks ntfStore
ntf <- mkMessageNotification msgId msgTs rcvNtfDhSecret
liftIO $ storeNtf ns nId ntf
hadPrevNtf <- liftIO $ storeNtf ns nId ntf
stats <- asks serverStats
when hadPrevNtf $ incStat $ msgNtfReplaced stats

mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> M MsgNtf
mkMessageNotification msgId msgTs rcvNtfDhSecret = do
Expand Down Expand Up @@ -1703,12 +1707,13 @@
logInfo $ "saving notifications to file " <> T.pack f
NtfStore ns <- asks ntfStore
liftIO . withFile f WriteMode $ \h ->
readTVarIO ns >>= mapM_ (saveQueueNtfs h) . M.assocs
readTVarIO ns >>= mapM_ (saveQueueNtf h) . M.assocs
logInfo "notifications saved"
where
-- reverse on save, to save notifications in order, will become reversed again when restoring.
saveQueueNtfs h (nId, v) = BLD.hPutBuilder h . encodeNtfs nId . reverse =<< readTVarIO v
encodeNtfs nId = mconcat . map (\ntf -> BLD.byteString (strEncode $ NLRv1 nId ntf) <> BLD.char8 '\n')
saveQueueNtf h (nId, v) = do
ntf_ <- readTVarIO v
forM_ ntf_ $ \ntf -> BLD.hPutBuilder h $ encodeNtf nId ntf
encodeNtf nId ntf = BLD.byteString (strEncode $ NLRv1 nId ntf) <> BLD.char8 '\n'

restoreServerNtfs :: M Int
restoreServerNtfs =
Expand All @@ -1731,12 +1736,16 @@
where
restoreNtf ns old !expired s' = do
NLRv1 nId ntf <- liftEither . first (ntfErr "parsing") $ strDecode s
liftIO $ addToNtfs nId ntf
addToNtfs nId ntf
where
s = LB.toStrict s'
addToNtfs nId ntf@MsgNtf {ntfTs}
| systemSeconds ntfTs < old = pure (expired + 1)
| otherwise = storeNtf ns nId ntf $> expired
| otherwise = do
hadPrevNtf <- liftIO $ storeNtf ns nId ntf
stats <- asks serverStats
when hadPrevNtf $ incStat $ msgNtfReplaced stats
pure expired
ntfErr :: Show e => String -> e -> String
ntfErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)

Expand Down
50 changes: 28 additions & 22 deletions src/Simplex/Messaging/Server/NtfStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ module Simplex.Messaging.Server.NtfStore where

import Control.Concurrent.STM
import Control.Monad (foldM)
import Data.Functor (($>))
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust)
import Data.Time.Clock.System (SystemTime (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (EncNMsgMeta, MsgId, NotifierId)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM

newtype NtfStore = NtfStore (TMap NotifierId (TVar [MsgNtf]))
newtype NtfStore = NtfStore (TMap NotifierId (TVar (Maybe MsgNtf)))

data MsgNtf = MsgNtf
{ ntfMsgId :: MsgId,
Expand All @@ -25,44 +27,48 @@ data MsgNtf = MsgNtf
ntfEncMeta :: EncNMsgMeta
}

storeNtf :: NtfStore -> NotifierId -> MsgNtf -> IO ()
storeNtf :: NtfStore -> NotifierId -> MsgNtf -> IO Bool
storeNtf (NtfStore ns) nId ntf = do
TM.lookupIO nId ns >>= atomically . maybe newNtfs (`modifyTVar'` (ntf :))
-- TODO coalesce messages here once the client is updated to process multiple messages
-- for single notification.
-- when (isJust prevNtf) $ incStat $ msgNtfReplaced stats
ntfVar_ <- TM.lookupIO nId ns
prevNtf <- atomically $ case ntfVar_ of
Just v -> swapTVar v (Just ntf)
Nothing -> newNtfs
pure $ isJust prevNtf
where
newNtfs = TM.lookup nId ns >>= maybe (TM.insertM nId (newTVar [ntf]) ns) (`modifyTVar'` (ntf :))
newNtfs = do
TM.lookup nId ns >>= \case
Just v -> swapTVar v (Just ntf)
Nothing -> TM.insertM nId (newTVar (Just ntf)) ns $> Nothing

deleteNtfs :: NtfStore -> NotifierId -> IO ()
deleteNtfs (NtfStore ns) nId = atomically $ TM.delete nId ns

flushNtfs :: NtfStore -> NotifierId -> IO [MsgNtf]
flushNtfs :: NtfStore -> NotifierId -> IO (Maybe MsgNtf)
flushNtfs (NtfStore ns) nId = do
TM.lookupIO nId ns >>= maybe (pure []) swapNtfs
TM.lookupIO nId ns >>= maybe (pure Nothing) swapNtfs
where
swapNtfs v =
readTVarIO v >>= \case
[] -> pure []
Nothing -> pure Nothing
-- if notifications available, atomically swap with empty array
_ -> atomically (swapTVar v [])
_ -> atomically (swapTVar v Nothing)

deleteExpiredNtfs :: NtfStore -> Int64 -> IO Int
deleteExpiredNtfs (NtfStore ns) old =
foldM (\expired -> fmap (expired +) . expireQueue) 0 . M.keys =<< readTVarIO ns
where
expireQueue nId = TM.lookupIO nId ns >>= maybe (pure 0) expire
expire v = readTVarIO v >>= \case
[] -> pure 0
_ ->
atomically $ readTVar v >>= \case
[] -> pure 0
-- check the last message first, it is the earliest
ntfs | systemSeconds (ntfTs $ last $ ntfs) < old -> do
let !ntfs' = filter (\MsgNtf {ntfTs = ts} -> systemSeconds ts >= old) ntfs
writeTVar v ntfs'
pure $! length ntfs - length ntfs'
_ -> pure 0
expire v =
readTVarIO v >>= \case
Nothing -> pure 0
_ ->
atomically $
readTVar v >>= \case
Nothing -> pure 0
Just ntf | systemSeconds (ntfTs ntf) < old -> do
writeTVar v Nothing
pure 1
_ -> pure 0

data NtfLogRecord = NLRv1 NotifierId MsgNtf

Expand Down
10 changes: 10 additions & 0 deletions src/Simplex/Messaging/Server/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ data ServerStats = ServerStats
msgNtfNoSub :: IORef Int, -- no subscriber to notifications (e.g., NTF server not connected)
msgNtfLost :: IORef Int, -- notification is lost because NTF delivery queue is full
msgNtfExpired :: IORef Int, -- expired
msgNtfReplaced :: IORef Int, -- replaced
pRelays :: ProxyStats,
pRelaysOwn :: ProxyStats,
pMsgFwds :: ProxyStats,
Expand Down Expand Up @@ -123,6 +124,7 @@ data ServerStatsData = ServerStatsData
_msgNtfNoSub :: Int,
_msgNtfLost :: Int,
_msgNtfExpired :: Int,
_msgNtfReplaced :: Int,
_pRelays :: ProxyStatsData,
_pRelaysOwn :: ProxyStatsData,
_pMsgFwds :: ProxyStatsData,
Expand Down Expand Up @@ -177,6 +179,7 @@ newServerStats ts = do
msgNtfNoSub <- newIORef 0
msgNtfLost <- newIORef 0
msgNtfExpired <- newIORef 0
msgNtfReplaced <- newIORef 0
pRelays <- newProxyStats
pRelaysOwn <- newProxyStats
pMsgFwds <- newProxyStats
Expand Down Expand Up @@ -228,6 +231,7 @@ newServerStats ts = do
msgNtfNoSub,
msgNtfLost,
msgNtfExpired,
msgNtfReplaced,
pRelays,
pRelaysOwn,
pMsgFwds,
Expand Down Expand Up @@ -281,6 +285,7 @@ getServerStatsData s = do
_msgNtfNoSub <- readIORef $ msgNtfNoSub s
_msgNtfLost <- readIORef $ msgNtfLost s
_msgNtfExpired <- readIORef $ msgNtfExpired s
_msgNtfReplaced <- readIORef $ msgNtfReplaced s
_pRelays <- getProxyStatsData $ pRelays s
_pRelaysOwn <- getProxyStatsData $ pRelaysOwn s
_pMsgFwds <- getProxyStatsData $ pMsgFwds s
Expand Down Expand Up @@ -332,6 +337,7 @@ getServerStatsData s = do
_msgNtfNoSub,
_msgNtfLost,
_msgNtfExpired,
_msgNtfReplaced,
_pRelays,
_pRelaysOwn,
_pMsgFwds,
Expand Down Expand Up @@ -386,6 +392,7 @@ setServerStats s d = do
writeIORef (msgNtfNoSub s) $! _msgNtfNoSub d
writeIORef (msgNtfLost s) $! _msgNtfLost d
writeIORef (msgNtfExpired s) $! _msgNtfExpired d
writeIORef (msgNtfReplaced s) $! _msgNtfReplaced d
setProxyStats (pRelays s) $! _pRelays d
setProxyStats (pRelaysOwn s) $! _pRelaysOwn d
setProxyStats (pMsgFwds s) $! _pMsgFwds d
Expand Down Expand Up @@ -438,6 +445,7 @@ instance StrEncoding ServerStatsData where
"msgNtfNoSub=" <> strEncode (_msgNtfNoSub d),
"msgNtfLost=" <> strEncode (_msgNtfLost d),
"msgNtfExpired=" <> strEncode (_msgNtfExpired d),
"msgNtfReplaced=" <> strEncode (_msgNtfReplaced d),
"activeQueues:",
strEncode (_activeQueues d),
"activeQueuesNtf:",
Expand Down Expand Up @@ -495,6 +503,7 @@ instance StrEncoding ServerStatsData where
_msgNtfNoSub <- opt "msgNtfNoSub="
_msgNtfLost <- opt "msgNtfLost="
_msgNtfExpired <- opt "msgNtfExpired="
_msgNtfReplaced <- opt "msgNtfReplaced="
_activeQueues <-
optional ("activeQueues:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
Expand Down Expand Up @@ -558,6 +567,7 @@ instance StrEncoding ServerStatsData where
_msgNtfNoSub,
_msgNtfLost,
_msgNtfExpired,
_msgNtfReplaced,
_activeQueues,
_activeQueuesNtf,
_pRelays,
Expand Down
2 changes: 1 addition & 1 deletion tests/AgentTests/NotificationTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ testNotificationSubscriptionNewConnection apns baseId alice bob =
get bob ##> ("", aliceId, INFO "alice's connInfo")
when (baseId == 3) $ void $ messageNotificationData alice apns
get alice ##> ("", bobId, CON)
when (baseId == 3) $ void $ messageNotificationData bob apns
-- when (baseId == 3) $ void $ messageNotificationData bob apns -- coalesced
get bob ##> ("", aliceId, CON)
-- bob sends message
1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello"
Expand Down
6 changes: 3 additions & 3 deletions tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ testRestoreMessages at@(ATransport t) =

logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 5
logSize testServerStatsBackupFile `shouldReturn` 74
logSize testServerStatsBackupFile `shouldReturn` 75
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats1 [rId] 5 1

Expand All @@ -633,7 +633,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 3
logSize testServerStatsBackupFile `shouldReturn` 74
logSize testServerStatsBackupFile `shouldReturn` 75
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats2 [rId] 5 3

Expand All @@ -652,7 +652,7 @@ testRestoreMessages at@(ATransport t) =

logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
logSize testServerStatsBackupFile `shouldReturn` 74
logSize testServerStatsBackupFile `shouldReturn` 75
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats3 [rId] 5 5

Expand Down
Loading