diff --git a/src/Chainweb/Mempool/InMem.hs b/src/Chainweb/Mempool/InMem.hs index 2c86640c5b..84c2a527be 100644 --- a/src/Chainweb/Mempool/InMem.hs +++ b/src/Chainweb/Mempool/InMem.hs @@ -34,7 +34,7 @@ import Control.Concurrent.Async import Control.Concurrent.MVar import Control.DeepSeq import Control.Error.Util (hush) -import Control.Exception (evaluate, mask_, throw) +import Control.Exception (evaluate, mask_, throw, bracket_) import Control.Monad import qualified Data.ByteString.Short as SB @@ -47,7 +47,7 @@ import Data.Foldable (foldl', foldlM) import Data.Function (on) import Data.HashMap.Strict (HashMap) import qualified Data.HashMap.Strict as HashMap -import Data.IORef (modifyIORef', newIORef, readIORef, writeIORef) +import Data.IORef import Data.Maybe import Data.Ord import qualified Data.Set as S @@ -84,6 +84,8 @@ import Numeric.AffineSpace import Data.ByteString (ByteString) import Data.Either (partitionEithers) import Control.Lens +import Data.HashSet (HashSet) +import qualified Data.HashSet as HashSet ------------------------------------------------------------------------------ compareOnGasPrice :: TransactionConfig t -> t -> t -> Ordering @@ -100,7 +102,8 @@ makeInMemPool :: InMemConfig t makeInMemPool cfg = mask_ $ do nonce <- randomIO dataLock <- newInMemMempoolData >>= newMVar - return $! InMemoryMempool cfg dataLock nonce + pendingInsertionRef <- newIORef mempty + return $! InMemoryMempool cfg dataLock pendingInsertionRef nonce ------------------------------------------------------------------------------ @@ -126,7 +129,7 @@ toMempoolBackend logger mempool = do , mempoolMember = memberInMem lockMVar , mempoolLookup = lookupInMem tcfg lockMVar , mempoolLookupEncoded = lookupEncodedInMem lockMVar - , mempoolInsert = insertInMem logger cfg lockMVar + , mempoolInsert = insertInMem logger cfg (_inmemInsertionPending mempool) lockMVar , mempoolInsertCheck = insertCheckInMem cfg lockMVar , mempoolInsertCheckVerbose = insertCheckVerboseInMem cfg lockMVar , mempoolMarkValidated = markValidatedInMem logger tcfg lockMVar @@ -500,7 +503,7 @@ insertCheckInMem' . NFData t => InMemConfig t -- ^ in-memory config -> MVar (InMemoryMempoolData t) -- ^ in-memory state - -> Vector t -- ^ new transactions + -> Vector (T2 TransactionHash t) -- ^ new transactions -> IO (Vector (T2 TransactionHash t)) insertCheckInMem' cfg lock txs | V.null txs = pure V.empty @@ -510,42 +513,54 @@ insertCheckInMem' cfg lock txs curTxIdx <- withMVarMasked lock $ readIORef . _inmemCurrentTxs let withHashes :: Vector (T2 TransactionHash t) - withHashes = flip V.mapMaybe txs $ \tx -> - let !h = hasher tx - in (T2 h) <$> hush (validateOne cfg badmap curTxIdx now tx h) + withHashes = flip V.mapMaybe txs $ \(T2 h tx) -> + T2 h <$> hush (validateOne cfg badmap curTxIdx now tx h) V.mapMaybe hush <$!> _inmemPreInsertBatchChecks cfg withHashes - where - txcfg = _inmemTxCfg cfg - hasher = txHasher txcfg insertInMem :: forall t logger. (NFData t, Logger logger) => logger -> InMemConfig t -- ^ in-memory config + -> IORef (HashSet TransactionHash) -> MVar (InMemoryMempoolData t) -- ^ in-memory state -> InsertType -> Vector t -- ^ new transactions -> IO () -insertInMem logger cfg lock runCheck txs0 = do - logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txs0) - txhashes <- insertCheck - withMVarMasked lock $ \mdata -> do - pending <- readIORef (_inmemPending mdata) - logFunctionText logger Debug $ "insertInMem: pending txs: " <> sshow (HashMap.keys pending) - let cnt = HashMap.size pending - let txs = V.take (max 0 (maxNumPending - cnt)) txhashes - let T2 !pending' !newHashesDL = V.foldl' insOne (T2 pending id) txs - logFunctionText logger Debug $ "insertInMem: updated pending txs: " <> sshow (HashMap.keys pending') - let !newHashes = V.fromList $ newHashesDL [] - writeIORef (_inmemPending mdata) $! force pending' - modifyIORef' (_inmemRecentLog mdata) $ - recordRecentTransactions maxRecent newHashes +insertInMem logger cfg pendingInsertionRef lock runCheck txs0 = do + pendingInsertionsDirty <- readIORef pendingInsertionRef + let + txs' = flip V.mapMaybe txs0 $ \tx -> + let hash = hasher tx + in if not (HashSet.member hash pendingInsertionsDirty) + then Just (T2 hash tx) + else Nothing + bracket_ + (atomicModifyIORef' pendingInsertionRef $ \pendingInsertions -> + (foldl' (flip HashSet.insert) pendingInsertions (sfst <$> txs'), ())) + (do + logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txs') + txhashes <- insertCheck txs' + withMVarMasked lock $ \mdata -> do + pending <- readIORef (_inmemPending mdata) + logFunctionText logger Debug $ "insertInMem: pending txs: " <> sshow (HashMap.keys pending) + let cnt = HashMap.size pending + let txs = V.take (max 0 (maxNumPending - cnt)) txhashes + let T2 !pending' !newHashesDL = V.foldl' insOne (T2 pending id) txs + logFunctionText logger Debug $ "insertInMem: updated pending txs: " <> sshow (HashMap.keys pending') + let !newHashes = V.fromList $ newHashesDL [] + writeIORef (_inmemPending mdata) $! force pending' + modifyIORef' (_inmemRecentLog mdata) $ + recordRecentTransactions maxRecent newHashes + ) + (atomicModifyIORef' pendingInsertionRef $ \pendingInsertions -> + (foldl' (flip HashSet.delete) pendingInsertions (sfst <$> txs'), ())) + where - insertCheck :: IO (Vector (T2 TransactionHash t)) - insertCheck = case runCheck of - CheckedInsert -> insertCheckInMem' cfg lock txs0 - UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txs0 + insertCheck :: Vector (T2 TransactionHash t) -> IO (Vector (T2 TransactionHash t)) + insertCheck txs' = case runCheck of + CheckedInsert -> insertCheckInMem' cfg lock txs' + UncheckedInsert -> return txs' txcfg = _inmemTxCfg cfg encodeTx = codecEncode (txCodec txcfg) diff --git a/src/Chainweb/Mempool/InMemTypes.hs b/src/Chainweb/Mempool/InMemTypes.hs index e95fb936e8..8130124e20 100644 --- a/src/Chainweb/Mempool/InMemTypes.hs +++ b/src/Chainweb/Mempool/InMemTypes.hs @@ -43,6 +43,7 @@ import Chainweb.Mempool.CurrentTxs import Chainweb.Mempool.Mempool import Chainweb.Time (Micros(..), Time(..)) import Chainweb.Utils (T2) +import Data.HashSet (HashSet) ------------------------------------------------------------------------------ data PendingEntry = PendingEntry @@ -80,6 +81,7 @@ data InMemConfig t = InMemConfig { data InMemoryMempool t = InMemoryMempool { _inmemCfg :: !(InMemConfig t) , _inmemDataLock :: !(MVar (InMemoryMempoolData t)) + , _inmemInsertionPending :: !(IORef (HashSet TransactionHash)) , _inmemNonce :: !ServerNonce }