diff --git a/src/Chainweb/Chainweb/MempoolSyncClient.hs b/src/Chainweb/Chainweb/MempoolSyncClient.hs index 953c2cdcdc..f5a1c15c99 100644 --- a/src/Chainweb/Chainweb/MempoolSyncClient.hs +++ b/src/Chainweb/Chainweb/MempoolSyncClient.hs @@ -69,7 +69,9 @@ runMempoolSyncClient mgr memP2pConfig peerRes chain = bracket create destroy go create = do logg Debug "starting mempool p2p sync" p2pCreateNode v netId peer (logFunction syncLogger) peerDb mgr True $ - mempoolSyncP2pSession chain (_mempoolP2pConfigPollInterval memP2pConfig) + mempoolSyncP2pSession chain + (_mempoolP2pConfigPollInterval memP2pConfig) + (_mempoolP2pConfigSendNewTxsDelay memP2pConfig) go n = do -- Run P2P client node logg Debug "mempool sync p2p node initialized, starting session" @@ -91,18 +93,19 @@ runMempoolSyncClient mgr memP2pConfig peerRes chain = bracket create destroy go mempoolSyncP2pSession :: ChainResources logger -> Seconds + -> Micros -> P2pSession -mempoolSyncP2pSession chain (Seconds pollInterval) logg0 env _ = do +mempoolSyncP2pSession chain (Seconds pollInterval) (Micros sendNewTxsDelayMicros) logg0 env _ = do logg Debug "mempool sync session starting" - Mempool.syncMempools' logg syncIntervalUs pool peerMempool + Mempool.syncMempools' logg syncIntervalMicros (int sendNewTxsDelayMicros) pool peerMempool logg Debug "mempool sync session finished" return True where peerMempool = MPC.toMempool v cid txcfg env -- FIXME Potentially dangerous down-cast. - syncIntervalUs :: Int - syncIntervalUs = int pollInterval * 500000 + syncIntervalMicros :: Int + syncIntervalMicros = int pollInterval * 500000 remote = T.pack $ Sv.showBaseUrl $ Sv.baseUrl env logg d m = logg0 d $ T.concat ["[mempool sync@", remote, "]:", m] diff --git a/src/Chainweb/Mempool/InMem.hs b/src/Chainweb/Mempool/InMem.hs index 6f35df1aa5..313887c100 100644 --- a/src/Chainweb/Mempool/InMem.hs +++ b/src/Chainweb/Mempool/InMem.hs @@ -28,6 +28,7 @@ module Chainweb.Mempool.InMem import Control.Applicative ((<|>)) import Control.Concurrent.Async import Control.Concurrent.MVar +import Control.Concurrent.STM import Control.DeepSeq import Control.Error.Util (hush) import Control.Exception (bracket, evaluate, mask_, throw) @@ -35,6 +36,7 @@ import Control.Monad import Data.Aeson import Data.Bifunctor (bimap) +import Data.ByteString (ByteString) import qualified Data.ByteString.Short as SB import Data.Decimal #if MIN_VERSION_base(4,20,0) @@ -79,7 +81,6 @@ import Chainweb.Version (ChainwebVersion) import qualified Pact.Types.ChainMeta as P import Numeric.AffineSpace -import Data.ByteString (ByteString) ------------------------------------------------------------------------------ compareOnGasPrice :: TransactionConfig t -> t -> t -> Ordering @@ -96,12 +97,12 @@ makeInMemPool :: InMemConfig t makeInMemPool cfg = mask_ $ do nonce <- randomIO dataLock <- newInMemMempoolData >>= newMVar - return $! InMemoryMempool cfg dataLock nonce + newTxsVar <- newTVarIO [] + return $! InMemoryMempool cfg dataLock newTxsVar nonce destroyInMemPool :: InMemoryMempool t -> IO () destroyInMemPool = const $ return () - ------------------------------------------------------------------------------ newInMemMempoolData :: IO (InMemoryMempoolData t) newInMemMempoolData = @@ -132,24 +133,27 @@ toMempoolBackend logger mempool = do , mempoolGetBlock = getBlock , mempoolPrune = prune , mempoolGetPendingTransactions = getPending + , mempoolGetNewTransactions = getNew , mempoolClear = clear } where cfg = _inmemCfg mempool nonce = _inmemNonce mempool lockMVar = _inmemDataLock mempool + newTxsVar = _inmemNewTxs mempool InMemConfig tcfg _ _ _ _ _ _ = cfg member = memberInMem lockMVar lookup = lookupInMem tcfg lockMVar lookupEncoded = lookupEncodedInMem lockMVar - insert = insertInMem cfg lockMVar + insert = insertInMem cfg lockMVar newTxsVar insertCheck = insertCheckInMem cfg lockMVar markValidated = markValidatedInMem logger tcfg lockMVar addToBadList = addToBadListInMem lockMVar checkBadList = checkBadListInMem lockMVar getBlock = getBlockInMem logger cfg lockMVar getPending = getPendingInMem cfg nonce lockMVar + getNew = getNewInMem newTxsVar prune = pruneInMem lockMVar clear = clearInMem lockMVar @@ -479,10 +483,11 @@ insertInMem . NFData t => InMemConfig t -- ^ in-memory config -> MVar (InMemoryMempoolData t) -- ^ in-memory state + -> TVar [t] -> InsertType -> Vector t -- ^ new transactions -> IO () -insertInMem cfg lock runCheck txs0 = do +insertInMem cfg lock newTxsVar runCheck txs0 = do txhashes <- insertCheck withMVarMasked lock $ \mdata -> do pending <- readIORef (_inmemPending mdata) @@ -495,9 +500,13 @@ insertInMem cfg lock runCheck txs0 = do recordRecentTransactions maxRecent newHashes where insertCheck :: IO (Vector (T2 TransactionHash t)) - insertCheck = if runCheck == CheckedInsert - then insertCheckInMem' cfg lock txs0 - else return $! V.map (\tx -> T2 (hasher tx) tx) txs0 + insertCheck = case runCheck of + CheckedInsert -> insertCheckInMem' cfg lock txs0 + UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txs0 + NewInsert -> do + -- we trust the caller to have done all necessary pre-insert checks + atomically $ modifyTVar newTxsVar $ (V.toList txs0 ++) + return $! V.map (\tx -> T2 (hasher tx) tx) txs0 txcfg = _inmemTxCfg cfg encodeTx = codecEncode (txCodec txcfg) @@ -706,6 +715,13 @@ getPendingInMem cfg nonce lock since callback = do sendChunk _ 0 = return () sendChunk dl _ = callback $! V.fromList $ dl [] +getNewInMem :: TVar [t] -> IO [t] +getNewInMem v = atomically $ do + ts <- readTVar v + guard (not $ null ts) + writeTVar v [] + return ts + ------------------------------------------------------------------------------ clearInMem :: MVar (InMemoryMempoolData t) -> IO () clearInMem lock = newInMemMempoolData >>= void . swapMVar lock diff --git a/src/Chainweb/Mempool/InMemTypes.hs b/src/Chainweb/Mempool/InMemTypes.hs index faac2b98a8..2b738ea246 100644 --- a/src/Chainweb/Mempool/InMemTypes.hs +++ b/src/Chainweb/Mempool/InMemTypes.hs @@ -43,6 +43,7 @@ import Chainweb.Mempool.CurrentTxs import Chainweb.Mempool.Mempool import Chainweb.Time (Micros(..), Time(..)) import Chainweb.Utils (T2) +import Control.Concurrent.STM ------------------------------------------------------------------------------ data PendingEntry = PendingEntry @@ -80,6 +81,11 @@ data InMemConfig t = InMemConfig { data InMemoryMempool t = InMemoryMempool { _inmemCfg :: !(InMemConfig t) , _inmemDataLock :: !(MVar (InMemoryMempoolData t)) + , _inmemNewTxs :: !(TVar [t]) + -- ^ The set of new transactions that have been submitted to this mempool + -- by a user and not by another mempool, and which haven't been sent to + -- other mempools yet. This is used to quickly send these transactions + -- to other mempool without waiting for the usual polling period. , _inmemNonce :: !ServerNonce } @@ -102,11 +108,12 @@ data InMemoryMempoolData t = InMemoryMempoolData { -- possibly have to pay gas for it several times. , _inmemCurrentTxs :: !(IORef CurrentTxs) - -- ^ The set of non-expired transactions that have been addeded to a block. + -- ^ The set of non-expired transactions that have been added to a block. -- Transactions are remove from the set of pending transactions when they -- are added to a block. This set is used to prevent transactions from being -- re-inserts when synchronizing with nodes that haven't yet validated the -- block. + } ------------------------------------------------------------------------------ diff --git a/src/Chainweb/Mempool/Mempool.hs b/src/Chainweb/Mempool/Mempool.hs index 2bbb3b7bfb..1573a2dde5 100644 --- a/src/Chainweb/Mempool/Mempool.hs +++ b/src/Chainweb/Mempool/Mempool.hs @@ -92,7 +92,7 @@ module Chainweb.Mempool.Mempool import Control.DeepSeq (NFData) import Control.Exception import Control.Lens hiding ((.=)) -import Control.Monad (replicateM, unless) +import Control.Monad (replicateM, unless, forever) import Crypto.Hash (hash) import Crypto.Hash.Algorithms (SHA512t_256) @@ -142,6 +142,7 @@ import Chainweb.Transaction import Chainweb.Utils import Chainweb.Utils.Serialization import Data.LogMessage (LogFunctionText) +import Control.Concurrent.Async ------------------------------------------------------------------------------ data LookupResult t = Missing @@ -215,7 +216,7 @@ data TransactionConfig t = TransactionConfig { type MempoolTxId = Int64 type ServerNonce = Int type HighwaterMark = (ServerNonce, MempoolTxId) -data InsertType = CheckedInsert | UncheckedInsert +data InsertType = CheckedInsert | UncheckedInsert | NewInsert deriving (Show, Eq) data InsertError = InsertErrorDuplicate @@ -310,6 +311,10 @@ data MempoolBackend t = MempoolBackend { -- | Discard any expired transactions. , mempoolPrune :: IO () + -- | Returns only transactions that have been newly constructed to send them + -- to the rest of the network for the first time. + , mempoolGetNewTransactions :: IO [t] + -- | given a previous high-water mark and a chunk callback function, loops -- through the pending candidate transactions and supplies the hashes to -- the callback in chunks. No ordering of hashes is presupposed. Returns @@ -342,6 +347,7 @@ noopMempool = do , mempoolGetBlock = noopGetBlock , mempoolPrune = return () , mempoolGetPendingTransactions = noopGetPending + , mempoolGetNewTransactions = return [] , mempoolClear = noopClear } where @@ -407,6 +413,9 @@ data SyncState = SyncState { , _syncTooMany :: !Bool } +-- sendNewTx :: t -> MempoolBackend t -> IO () + + -- | Pulls any missing pending transactions from a remote mempool. -- -- The initial sync procedure: @@ -425,12 +434,14 @@ syncMempools' => LogFunctionText -> Int -- ^ polling interval in microseconds + -> Int + -- ^ sending new txs delay in microseconds -> MempoolBackend t -- ^ local mempool -> MempoolBackend t -- ^ remote mempool -> IO () -syncMempools' log0 us localMempool remoteMempool = sync +syncMempools' log0 syncDelayMicros sendNewTxsDelayMicros localMempool remoteMempool = sync where maxCnt = 5000 @@ -489,7 +500,11 @@ syncMempools' log0 us localMempool remoteMempool = sync deb :: Text -> IO () deb = log0 Debug - sync = finally (initialSync >>= subsequentSync) (deb "sync exiting") + sync = finally + (do + hw <- initialSync + subsequentSync hw `race_` sendNewTxs) + (deb "sync exiting") initialSync = do deb "Get full list of pending hashes from remote" @@ -525,9 +540,17 @@ syncMempools' log0 us localMempool remoteMempool = sync , " new remote hashes need to be fetched" ] traverse_ fetchMissing missingChunks - approximateThreadDelay us + approximateThreadDelay syncDelayMicros subsequentSync remoteHw' + sendNewTxs = forever $ do + newTxs <- mempoolGetNewTransactions localMempool + deb $ + "Sending newly constructed transactions: " <> + sshow (txHasher (mempoolTxConfig localMempool) <$> newTxs) + mempoolInsert remoteMempool CheckedInsert $! V.fromList newTxs + approximateThreadDelay sendNewTxsDelayMicros + -- get pending hashes from remote since the given (optional) high water mark fetchSince oldRemoteHw = do -- Intialize and collect SyncState @@ -563,11 +586,12 @@ syncMempools :: Show t => LogFunctionText -> Int -- ^ polling interval in microseconds + -> Int -- ^ new tx sending delay in microseconds -> MempoolBackend t -- ^ local mempool -> MempoolBackend t -- ^ remote mempool -> IO () -syncMempools log us localMempool remoteMempool = - syncMempools' log us localMempool remoteMempool +syncMempools log syncDelayMicros sendNewTxsDelayMicros localMempool remoteMempool = + syncMempools' log syncDelayMicros sendNewTxsDelayMicros localMempool remoteMempool ------------------------------------------------------------------------------ -- | Raw/unencoded transaction hashes. diff --git a/src/Chainweb/Mempool/P2pConfig.hs b/src/Chainweb/Mempool/P2pConfig.hs index b4e24fd893..011a0d6bfb 100644 --- a/src/Chainweb/Mempool/P2pConfig.hs +++ b/src/Chainweb/Mempool/P2pConfig.hs @@ -4,6 +4,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE NumericUnderscores #-} -- | -- Module: Chainweb.Mempool.P2pConfig @@ -19,6 +20,7 @@ module Chainweb.Mempool.P2pConfig , mempoolP2pConfigMaxSessionCount , mempoolP2pConfigSessionTimeout , mempoolP2pConfigPollInterval +, mempoolP2pConfigSendNewTxsDelay , defaultMempoolP2pConfig , pMempoolP2pConfig ) where @@ -42,6 +44,7 @@ data MempoolP2pConfig = MempoolP2pConfig , _mempoolP2pConfigSessionTimeout :: !Seconds -- ^ timeout in seconds , _mempoolP2pConfigPollInterval :: !Seconds + , _mempoolP2pConfigSendNewTxsDelay :: !Micros } deriving (Show, Eq, Ord, Generic) @@ -51,7 +54,8 @@ defaultMempoolP2pConfig :: MempoolP2pConfig defaultMempoolP2pConfig = MempoolP2pConfig { _mempoolP2pConfigMaxSessionCount = 6 , _mempoolP2pConfigSessionTimeout = 300 - , _mempoolP2pConfigPollInterval = 30 + , _mempoolP2pConfigPollInterval = Seconds 30 + , _mempoolP2pConfigSendNewTxsDelay = Micros 500_000 } instance ToJSON MempoolP2pConfig where @@ -72,6 +76,7 @@ instance FromJSON MempoolP2pConfig where <$> o .: "maxSessionCount" <*> o .: "sessionTimeout" <*> o .: "pollInterval" + <*> o .: "sendNewTxsDelay" pMempoolP2pConfig :: MParser MempoolP2pConfig pMempoolP2pConfig = id @@ -84,3 +89,6 @@ pMempoolP2pConfig = id <*< mempoolP2pConfigPollInterval .:: textOption % long "mempool-p2p-poll-interval" <> help "poll interval for synchronizing mempools in seconds" + <*< mempoolP2pConfigPollInterval .:: textOption + % long "mempool-p2p-send-new-txs-delay" + <> help "delay between sending new transactions to other mempools in micros" diff --git a/src/Chainweb/Mempool/RestAPI/Client.hs b/src/Chainweb/Mempool/RestAPI/Client.hs index ad15d40776..af64adc573 100644 --- a/src/Chainweb/Mempool/RestAPI/Client.hs +++ b/src/Chainweb/Mempool/RestAPI/Client.hs @@ -64,6 +64,7 @@ toMempool version chain txcfg env = , mempoolCheckBadList = const unsupported , mempoolGetBlock = \_ _ _ _ -> unsupported , mempoolGetPendingTransactions = getPending + , mempoolGetNewTransactions = unsupported , mempoolPrune = unsupported , mempoolClear = clear } diff --git a/src/Chainweb/Pact/RestAPI/Server.hs b/src/Chainweb/Pact/RestAPI/Server.hs index f6dfd0bcfa..aac83e721c 100644 --- a/src/Chainweb/Pact/RestAPI/Server.hs +++ b/src/Chainweb/Pact/RestAPI/Server.hs @@ -258,7 +258,7 @@ sendHandler logger v cid mempool (SubmitBatch cmds) = Handler $ do let txs = V.fromList $ NEL.toList enriched -- If any of the txs in the batch fail validation, we reject them all. liftIO (mempoolInsertCheck mempool txs) >>= checkResult - liftIO (mempoolInsert mempool UncheckedInsert txs) + liftIO (mempoolInsert mempool NewInsert txs) return $! RequestKeys $ NEL.map cmdToRequestKey enriched Left err -> failWith $ "Validation failed: " <> T.pack err where diff --git a/test/Chainweb/Test/Mempool/Sync.hs b/test/Chainweb/Test/Mempool/Sync.hs index 5a123429bd..110868bae9 100644 --- a/test/Chainweb/Test/Mempool/Sync.hs +++ b/test/Chainweb/Test/Mempool/Sync.hs @@ -34,6 +34,7 @@ import Chainweb.Utils (Codec(..)) tests :: TestTree tests = testGroup "Chainweb.Mempool.sync" [ mempoolProperty "Mempool.syncMempools" gen propSync $ MempoolWithFunc wf + , mempoolProperty "Mempool.syncMempoolsSendNew" (pick arbitrary) propSendNew $ MempoolWithFunc wf ] where wf :: (InsertCheck -> MempoolBackend MockTx -> IO a) -> IO a @@ -69,6 +70,27 @@ testInMemCfg :: InMemConfig MockTx testInMemCfg = InMemConfig txcfg mockBlockGasLimit 0 2048 Right (pure . V.map Right) (1024 * 10) +propSendNew + :: Set MockTx + -> InsertCheck + -> MempoolBackend MockTx + -> IO (Either String ()) +propSendNew txs _ localMempool' = + withInMemoryMempool testInMemCfg (barebonesTestVersion singletonChainGraph) $ \remoteMempool -> do + inserted <- newEmptyMVar + -- make a mempool which puts to the above mvar when all txs are inserted + localMempool <- timebomb (V.length txsV) (putMVar inserted ()) localMempool' + mempoolInsert localMempool NewInsert txsV + -- never do a sync, only do sendNew + let sync = syncMempools (\_ _ -> return ()) maxBound 0 localMempool remoteMempool + -- wait 10 seconds for the insert to happen + m <- timeout 10_000_000 $ do + Async.withAsync sync $ \_ -> do + takeMVar inserted + maybe (fail "timeout") (pure . Right) m + where + txsV = V.fromList $ Set.toList txs + propSync :: (Set MockTx, Set MockTx , Set MockTx) -> InsertCheck @@ -90,7 +112,7 @@ propSync (txs, missing, later) _ localMempool' = localMempool <- timebomb nmissing onInitialSyncFinished =<< timebomb (nmissing + nlater) onFinalSyncFinished localMempool' - let syncThread = syncMempools noLog 10 localMempool remoteMempool + let syncThread = syncMempools noLog 10 10 localMempool remoteMempool -- expect remote to deliver transactions during sync.