Skip to content

Avoid duplicate pre-insert checks; #2140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 44 additions & 29 deletions src/Chainweb/Mempool/InMem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.DeepSeq
import Control.Error.Util (hush)
import Control.Exception (evaluate, mask_, throw)
import Control.Exception (evaluate, mask_, throw, bracket_)
import Control.Monad

import qualified Data.ByteString.Short as SB
Expand All @@ -47,7 +47,7 @@ import Data.Foldable (foldl', foldlM)
import Data.Function (on)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.IORef (modifyIORef', newIORef, readIORef, writeIORef)
import Data.IORef
import Data.Maybe
import Data.Ord
import qualified Data.Set as S
Expand Down Expand Up @@ -84,6 +84,8 @@ import Numeric.AffineSpace
import Data.ByteString (ByteString)
import Data.Either (partitionEithers)
import Control.Lens
import Data.HashSet (HashSet)
import qualified Data.HashSet as HashSet

------------------------------------------------------------------------------
compareOnGasPrice :: TransactionConfig t -> t -> t -> Ordering
Expand All @@ -100,7 +102,8 @@ makeInMemPool :: InMemConfig t
makeInMemPool cfg = mask_ $ do
nonce <- randomIO
dataLock <- newInMemMempoolData >>= newMVar
return $! InMemoryMempool cfg dataLock nonce
pendingInsertionRef <- newIORef mempty
return $! InMemoryMempool cfg dataLock pendingInsertionRef nonce


------------------------------------------------------------------------------
Expand All @@ -126,7 +129,7 @@ toMempoolBackend logger mempool = do
, mempoolMember = memberInMem lockMVar
, mempoolLookup = lookupInMem tcfg lockMVar
, mempoolLookupEncoded = lookupEncodedInMem lockMVar
, mempoolInsert = insertInMem logger cfg lockMVar
, mempoolInsert = insertInMem logger cfg (_inmemInsertionPending mempool) lockMVar
, mempoolInsertCheck = insertCheckInMem cfg lockMVar
, mempoolInsertCheckVerbose = insertCheckVerboseInMem cfg lockMVar
, mempoolMarkValidated = markValidatedInMem logger tcfg lockMVar
Expand Down Expand Up @@ -500,7 +503,7 @@ insertCheckInMem'
. NFData t
=> InMemConfig t -- ^ in-memory config
-> MVar (InMemoryMempoolData t) -- ^ in-memory state
-> Vector t -- ^ new transactions
-> Vector (T2 TransactionHash t) -- ^ new transactions
-> IO (Vector (T2 TransactionHash t))
insertCheckInMem' cfg lock txs
| V.null txs = pure V.empty
Expand All @@ -510,42 +513,54 @@ insertCheckInMem' cfg lock txs
curTxIdx <- withMVarMasked lock $ readIORef . _inmemCurrentTxs

let withHashes :: Vector (T2 TransactionHash t)
withHashes = flip V.mapMaybe txs $ \tx ->
let !h = hasher tx
in (T2 h) <$> hush (validateOne cfg badmap curTxIdx now tx h)
withHashes = flip V.mapMaybe txs $ \(T2 h tx) ->
T2 h <$> hush (validateOne cfg badmap curTxIdx now tx h)

V.mapMaybe hush <$!> _inmemPreInsertBatchChecks cfg withHashes
where
txcfg = _inmemTxCfg cfg
hasher = txHasher txcfg

insertInMem
:: forall t logger. (NFData t, Logger logger)
=> logger
-> InMemConfig t -- ^ in-memory config
-> IORef (HashSet TransactionHash)
-> MVar (InMemoryMempoolData t) -- ^ in-memory state
-> InsertType
-> Vector t -- ^ new transactions
-> IO ()
insertInMem logger cfg lock runCheck txs0 = do
logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txs0)
txhashes <- insertCheck
withMVarMasked lock $ \mdata -> do
pending <- readIORef (_inmemPending mdata)
logFunctionText logger Debug $ "insertInMem: pending txs: " <> sshow (HashMap.keys pending)
let cnt = HashMap.size pending
let txs = V.take (max 0 (maxNumPending - cnt)) txhashes
let T2 !pending' !newHashesDL = V.foldl' insOne (T2 pending id) txs
logFunctionText logger Debug $ "insertInMem: updated pending txs: " <> sshow (HashMap.keys pending')
let !newHashes = V.fromList $ newHashesDL []
writeIORef (_inmemPending mdata) $! force pending'
modifyIORef' (_inmemRecentLog mdata) $
recordRecentTransactions maxRecent newHashes
insertInMem logger cfg pendingInsertionRef lock runCheck txs0 = do
pendingInsertionsDirty <- readIORef pendingInsertionRef
let
txs' = flip V.mapMaybe txs0 $ \tx ->
let hash = hasher tx
in if not (HashSet.member hash pendingInsertionsDirty)
then Just (T2 hash tx)
else Nothing
bracket_
(atomicModifyIORef' pendingInsertionRef $ \pendingInsertions ->
(foldl' (flip HashSet.insert) pendingInsertions (sfst <$> txs'), ()))
(do
logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txs')
txhashes <- insertCheck txs'
withMVarMasked lock $ \mdata -> do
pending <- readIORef (_inmemPending mdata)
logFunctionText logger Debug $ "insertInMem: pending txs: " <> sshow (HashMap.keys pending)
let cnt = HashMap.size pending
let txs = V.take (max 0 (maxNumPending - cnt)) txhashes
let T2 !pending' !newHashesDL = V.foldl' insOne (T2 pending id) txs
logFunctionText logger Debug $ "insertInMem: updated pending txs: " <> sshow (HashMap.keys pending')
let !newHashes = V.fromList $ newHashesDL []
writeIORef (_inmemPending mdata) $! force pending'
modifyIORef' (_inmemRecentLog mdata) $
recordRecentTransactions maxRecent newHashes
)
(atomicModifyIORef' pendingInsertionRef $ \pendingInsertions ->
(foldl' (flip HashSet.delete) pendingInsertions (sfst <$> txs'), ()))

where
insertCheck :: IO (Vector (T2 TransactionHash t))
insertCheck = case runCheck of
CheckedInsert -> insertCheckInMem' cfg lock txs0
UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txs0
insertCheck :: Vector (T2 TransactionHash t) -> IO (Vector (T2 TransactionHash t))
insertCheck txs' = case runCheck of
CheckedInsert -> insertCheckInMem' cfg lock txs'
UncheckedInsert -> return txs'

txcfg = _inmemTxCfg cfg
encodeTx = codecEncode (txCodec txcfg)
Expand Down
2 changes: 2 additions & 0 deletions src/Chainweb/Mempool/InMemTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import Chainweb.Mempool.CurrentTxs
import Chainweb.Mempool.Mempool
import Chainweb.Time (Micros(..), Time(..))
import Chainweb.Utils (T2)
import Data.HashSet (HashSet)

------------------------------------------------------------------------------
data PendingEntry = PendingEntry
Expand Down Expand Up @@ -80,6 +81,7 @@ data InMemConfig t = InMemConfig {
data InMemoryMempool t = InMemoryMempool {
_inmemCfg :: !(InMemConfig t)
, _inmemDataLock :: !(MVar (InMemoryMempoolData t))
, _inmemInsertionPending :: !(IORef (HashSet TransactionHash))
, _inmemNonce :: !ServerNonce
}

Expand Down
Loading