From 578562888e37478b086e150451cecedce907cf3c Mon Sep 17 00:00:00 2001 From: erin Date: Thu, 9 May 2013 16:55:58 -0700 Subject: [PATCH 1/3] simplify montaged with a bounded thread count --- Network/Riak/Montage/Main.hs | 21 ++++-- Network/Riak/Montage/Process.hs | 107 ++----------------------------- Network/Riak/Montage/Protocol.hs | 51 +++++++++------ 3 files changed, 50 insertions(+), 129 deletions(-) diff --git a/Network/Riak/Montage/Main.hs b/Network/Riak/Montage/Main.hs index 76063b6..a0fe67e 100644 --- a/Network/Riak/Montage/Main.hs +++ b/Network/Riak/Montage/Main.hs @@ -1,19 +1,20 @@ module Network.Riak.Montage.Main where +import Prelude hiding (log) import System.IO (hSetBuffering, BufferMode(..), stdout, stderr) import Control.Monad (forever, void) import Control.Concurrent (forkIO, threadDelay) -import qualified Data.Text as T +import Control.Concurrent (newEmptyMVar) import qualified Data.ByteString.Lazy.Char8 as B -import Network.StatsWeb (initStats, addCounter, incCounter, runStats, Stats) +import Network.StatsWeb (initStats, incCounter, runStats, Stats) import Data.Pool (Pool, createPool') import Network.Riak (defaultClient, connect, disconnect, Client(port), Connection) import Network.Riak.Montage.Protocol -import Network.Riak.Montage.Process (newEmptyConcurrentState, generateRequest) +import Network.Riak.Montage.Process (generateRequest) import Network.Riak.Montage.Types import Network.Riak.Montage.Util @@ -42,7 +43,7 @@ cfg = Config { , statsPrefix = "montage" , statsPort = 3334 , generator = generateRequest - , maxRequests = 700 + , maxRequests = 100 , requestTimeout = 30 , readOnly = False , logCommands = False @@ -71,17 +72,25 @@ runDaemon cfg' pools = do stats <- initStats (statsPrefix cfg') - state <- newEmptyConcurrentState + q <- newEmptyMVar let chooser' = chooser pools let logging = logger cfg' let runOn = "tcp://*:" ++ show (proxyPort cfg') - void $ forkIO $ loggedSupervise logging "network-zeromq" $ serveMontageZmq (generator cfg') runOn state logging chooser' stats (maxRequests cfg') (requestTimeout cfg') (readOnly cfg') (logCommands cfg') + let loop = processLoop q (generator cfg') logging chooser' stats (requestTimeout cfg') (readOnly cfg') (logCommands cfg') + mapM_ (makeChild logging loop) [1..(maxRequests cfg')] + + void $ forkIO $ loggedSupervise logging "network-zeromq" $ serveMontageZmq q runOn stats void $ forkIO $ loggedSupervise logging "timekeeper" $ timeKeeper stats void $ forkIO $ runStats stats (statsPort cfg') sleepForever +makeChild :: LogCallback -> IO () -> Int -> IO () +makeChild log loop n = void $ forkIO $ loggedSupervise log label $ loop + where + label = "node-" ++ show n + timeKeeper :: Stats -> IO a timeKeeper stats = forever $ do logError "TICK" diff --git a/Network/Riak/Montage/Process.hs b/Network/Riak/Montage/Process.hs index ee3a421..a22af03 100644 --- a/Network/Riak/Montage/Process.hs +++ b/Network/Riak/Montage/Process.hs @@ -3,19 +3,11 @@ module Network.Riak.Montage.Process where import Control.Monad (void, when) import Control.Concurrent (forkIO) import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar, MVar) -import Control.Concurrent.STM (newTVarIO) -import Control.Concurrent.STM.TVar (readTVar, writeTVar, TVar) -import Control.Concurrent.STM.TMVar (newEmptyTMVar, readTMVar, putTMVar, TMVar) -import Control.Concurrent.STM.Stats (trackNamedSTM) -import Control.Exception (finally, try, throw, SomeException) +import Control.Exception (try, throw, SomeException) import Text.ProtocolBuffers.WireMessage (messageGet, messagePut) import System.Timeout (timeout) -import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Maybe (fromJust) -import qualified Data.HashMap.Strict as HM -import Control.Applicative - import qualified Network.Riak.Types as RT import Text.ProtocolBuffers.Basic (toUtf8, utf8) import qualified Data.Text.Encoding as E @@ -46,56 +38,6 @@ import qualified Network.Riak.Montage.Proto.Montage.MontageCommand as MC import qualified Network.Riak.Montage.Proto.Montage.MontageCommandResponse as MCR import qualified Network.Riak.Montage.Proto.Montage.MontageDelete as MD --- how many requests before printing stats? -statsEvery :: Int -statsEvery = 100 - -data ConcurrentState = ConcurrentState { - concurrentCount :: TVar Int - , tick :: TVar Int - , ts :: TVar Double - , pipeline :: TVar (HM.HashMap (RT.Bucket, RT.Key) (TMVar (Either SomeException CommandResponse))) - } - -newEmptyConcurrentState :: IO ConcurrentState -newEmptyConcurrentState = ConcurrentState <$> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO HM.empty - -pipelineGet :: (MontageRiakValue t) => ConcurrentState -> ChainCommand t - -> (Int -> IO CommandResponse -> IO CommandResponse) - -> Int -> IO CommandResponse -> IO CommandResponse -pipelineGet state (ChainGet buck key Nothing) tracker requestTimeout' actuallyRun = do - opt <- eitherAnswerOrMandate - mans <- case opt of - Left tmv -> do - mans <- try $ tracker requestTimeout' actuallyRun - trackNamedSTM "non-pipelined" $ do - putTMVar tmv mans - hash <- readTVar (pipeline state) - let hash' = HM.delete hashkey hash - writeTVar (pipeline state) hash' - return mans - Right tmv -> do - logError $ "(key request for " ++ (show buck) ++ "/" ++ (show key) ++ " is pipelined)" - runWithTimeout requestTimeout' $ trackNamedSTM "pipelined" $ readTMVar tmv - - case mans of - Left (e::SomeException) -> throw e - Right ans -> return ans - where - eitherAnswerOrMandate = trackNamedSTM "eitherAnswerOrMandate" $ do - hash <- readTVar (pipeline state) - case HM.lookup hashkey hash of - Just tmv -> return $ Right tmv - Nothing -> do - newTmv <- newEmptyTMVar - let hash' = HM.insert hashkey newTmv hash - writeTVar (pipeline state) hash' - return $ Left newTmv - - hashkey = (buck, key) - -pipelineGet _ _ tracker requestTimeout' actuallyRun = tracker requestTimeout' actuallyRun - runWithTimeout :: Int -> IO a -> IO a runWithTimeout requestTimeout' action = do mr <- timeout requestTimeout action @@ -107,45 +49,6 @@ runWithTimeout requestTimeout' action = do where requestTimeout = requestTimeout' * 1000000 -trackConcurrency :: ConcurrentState -> Int -> Int -> IO CommandResponse - -> IO CommandResponse -trackConcurrency state maxRequests' requestTimeout' action = do - mcount <- maybeIncrCount - case mcount of - Just count -> do - logState count - finally (runWithTimeout requestTimeout' action) decrCount - Nothing -> error "concurrency limit hit" - where - maybeIncrCount = trackNamedSTM "maybeIncCount" $ do - count <- readTVar (concurrentCount state) - if (count < maxRequests') - then (writeTVar (concurrentCount state) (count + 1) >> return (Just $ count + 1)) - else (return Nothing) - - decrCount = trackNamedSTM "decrCount" $ do - count <- readTVar (concurrentCount state) - writeTVar (concurrentCount state) $ count - 1 - - logState count = do - now <- fmap realToFrac getPOSIXTime - mlog <- trackNamedSTM "logState" $ do - tick' <- fmap (+1) $ readTVar (tick state) - writeTVar (tick state) tick' - if tick' `mod` statsEvery == 0 - then do - last' <- readTVar (ts state) - writeTVar (ts state) now - return (Just last') - else (return Nothing) - case mlog of - Just last' -> do - let speed = (fromIntegral statsEvery) / (now - last') -- should never be /0 - logError ("{stats} concurrency=" ++ (show count) - ++ " rate=" ++ (show speed)) - --dumpSTMStats - Nothing -> return () - fromRight :: Either a b -> b fromRight (Right x) = x fromRight (Left _) = error "fromRight got Left!" @@ -224,17 +127,15 @@ statsChainCustom :: (MontageRiakValue r) => ChainCommand r -> Stats -> IO () statsChainCustom (ChainCustom cmd _) stats = incCounter (TL.toStrict $ format "requests.custom[type={}]" (Only cmd)) stats statsChainCustom _ _ = return () -processRequest :: (MontageRiakValue r) => ConcurrentState -> PoolChooser -> ChainCommand r -> Stats -> Int -> Int -> Bool -> Bool -> IO CommandResponse -processRequest state chooser' cmd stats maxRequests' requestTimeout' readOnly' logCommands' = do +processRequest :: (MontageRiakValue r) => PoolChooser -> ChainCommand r -> Stats -> Int -> Bool -> Bool -> IO CommandResponse +processRequest chooser' cmd stats requestTimeout' readOnly' logCommands' = do when (readOnly' && (not $ isRead cmd)) $ error "Non-read request issued to read-only montage" when (logCommands') $ logError $ "Running command " ++ show cmd - pipelineGet state cmd tracker requestTimeout' (processRequest' chooser' cmd stats) - where - tracker = trackConcurrency state maxRequests' + runWithTimeout requestTimeout' (processRequest' chooser' cmd stats) processRequest' :: (MontageRiakValue r) => PoolChooser -> ChainCommand r -> Stats -> IO CommandResponse processRequest' chooser' cmd stats = do diff --git a/Network/Riak/Montage/Protocol.hs b/Network/Riak/Montage/Protocol.hs index 58d2da0..52b2e30 100644 --- a/Network/Riak/Montage/Protocol.hs +++ b/Network/Riak/Montage/Protocol.hs @@ -3,7 +3,7 @@ module Network.Riak.Montage.Protocol where import System.ZMQ import System.UUID.V4 (uuid) import Control.Monad (forever) -import Control.Concurrent (forkIO) +import Control.Concurrent.MVar (MVar, takeMVar, putMVar) import qualified Data.ByteString.Lazy.Char8 as B import qualified Data.ByteString.Lazy as BW import qualified Data.ByteString.Char8 as S @@ -12,19 +12,20 @@ import Text.ProtocolBuffers.WireMessage (messageGet, messagePut) import Text.ProtocolBuffers.Basic (uFromString) import Data.Aeson (object, (.=)) -import Network.Riak.Montage.Util - -import Network.StatsWeb (Stats) +import Network.StatsWeb (Stats, incCounter) +import Network.Riak.Montage.Util +import Network.Riak.Montage.Types import Network.Riak.Montage.Proto.Montage.MontageEnvelope as ME import Network.Riak.Montage.Proto.Montage.MontageWireMessages import Network.Riak.Montage.Proto.Montage.MontageError -import Network.Riak.Montage.Types import Network.Riak.Montage.Process (processRequest, - serializeResponse, ConcurrentState(..)) + serializeResponse) + +type ZmqHandler = (S.ByteString -> ZmqCallback -> IO ()) -type ZmqHandler = (S.ByteString -> (BW.ByteString -> IO ()) -> IO ()) +type ZmqCallback = (BW.ByteString -> IO ()) runZmqRpc :: String -> ZmqHandler @@ -37,7 +38,7 @@ runZmqRpcWithContext :: Context -> String -> ZmqHandler -> IO () -runZmqRpcWithContext ctx binda call = do +runZmqRpcWithContext ctx binda serve = do withSocket ctx Router (\s -> do rand <- uuid let inproc = "inproc://" ++ (show rand) @@ -55,8 +56,7 @@ runZmqRpcWithContext ctx binda call = do send s "" [SndMore] send s m [] else do -- call - _ <- forkIO $ call m (zmqRpcReply ctx inproc zid) - return () + serve m (zmqRpcReply ctx inproc zid) ) zmqRpcReply :: Context @@ -71,29 +71,40 @@ zmqRpcReply c inproc retid out = do send s retid [] ) -serveMontageZmq :: (MontageRiakValue r) => - (MontageEnvelope -> ChainCommand r) -> - String -> ConcurrentState -> LogCallback -> - PoolChooser -> Stats -> Int -> Int -> Bool -> Bool -> IO () -serveMontageZmq generate runOn state logCB chooser' stats maxRequests' requestTimeout' readOnly' logCommands' = do - runZmqRpc runOn wrapMontage +serveMontageZmq :: MVar (S.ByteString, ZmqCallback) -> String -> Stats -> IO () +serveMontageZmq queueAny runOn stats = do + runZmqRpc runOn serveMontage + where + serveMontage m cb = do + putMVar queueAny (m, cb) + incCounter "requests" stats + +processLoop :: (MontageRiakValue r) => + MVar (S.ByteString, ZmqCallback) -> + (MontageEnvelope -> ChainCommand r) -> + LogCallback -> PoolChooser -> Stats -> + Int -> Bool -> Bool -> IO () +processLoop queueAny generate logCB chooser' stats requestTimeout' readOnly' logCommands' = do + forever $ do + (req, cb) <- takeMVar queueAny + wrapMontage req >>= cb where - wrapMontage m cb = do + wrapMontage m = do case messageGet $ sTl m of Right (env, x) | B.length x == 0 -> do res <- try $ do let !cmd = generate env - fmap (serializeResponse env) $ processRequest state chooser' cmd stats maxRequests' requestTimeout' readOnly' logCommands' + fmap (serializeResponse env) $ processRequest chooser' cmd stats requestTimeout' readOnly' logCommands' case res of Left (e :: SomeException) -> returnError (show e) $ msgid env - Right outenv -> cb $ messagePut outenv + Right outenv -> return . messagePut $ outenv _ -> returnError "Failed to decode MontageEnvelope" Nothing where returnError err msgid' = do logError err logCB "EXCEPTION" Nothing $ object ["error" .= err ] - cb $ messagePut $ MontageEnvelope { + return . messagePut $ MontageEnvelope { mtype = MONTAGE_ERROR , msg = messagePut $ MontageError (uFromString err) , msgid = msgid' From b1e00480fdd7ceb0d6624e3a7f5d4b8ede53f39d Mon Sep 17 00:00:00 2001 From: erin Date: Mon, 20 May 2013 14:24:49 -0700 Subject: [PATCH 2/3] use more explicit resource pool for catching resource exhaustion exceptions --- Network/Riak/Montage/Backend.hs | 8 ++++++-- montage.cabal | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Network/Riak/Montage/Backend.hs b/Network/Riak/Montage/Backend.hs index b087980..1afef6e 100644 --- a/Network/Riak/Montage/Backend.hs +++ b/Network/Riak/Montage/Backend.hs @@ -21,12 +21,16 @@ maxRetries = 3 -- We can't actually put in the type signatures below, see -- http://www.haskell.org/haskellwiki/Type_families#Injectivity.2C_type_inference.2C_and_ambiguity -retryOperation :: IO a -> IO a +retryOperation :: IO (Either SomeException a) -> IO a retryOperation op = retryOperation' 0 where --retryOperation' :: IO a -> Int -> IO a - retryOperation' retries = catch op (handleError retries) + retryOperation' retries = do + res <- op + case res of + Left (e :: SomeException) -> handleError retries e + Right res' -> return res' --handleError :: (Exception e) => Int -> e -> IO a handleError retries e = case retries > maxRetries of diff --git a/montage.cabal b/montage.cabal index 1a231c0..212fa4a 100644 --- a/montage.cabal +++ b/montage.cabal @@ -1,5 +1,5 @@ Name: montage -Version: 0.2.9 +Version: 0.3.3 Synopsis: Riak Resolution Proxy Homepage: http://github.com/bumptech/montage License: BSD3 @@ -51,7 +51,7 @@ Library ListLike, stm>=2.2, riak-bump>=0.7.3.6, - bump-resource-pool, + bump-resource-pool==0.2.1.2, unordered-containers >= 0.2.1 && < 0.3, zeromq-haskell>=0.8 && < 0.9, protocol-buffers >= 2.0.11, From a7eefd20c9d93d0b77aa99544108e6f8e6c295b9 Mon Sep 17 00:00:00 2001 From: erin Date: Mon, 20 May 2013 17:51:10 -0700 Subject: [PATCH 3/3] use safer, connection limiting riak pool --- Network/Riak/Montage/Main.hs | 7 ++++--- montage.cabal | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Network/Riak/Montage/Main.hs b/Network/Riak/Montage/Main.hs index a0fe67e..706bae2 100644 --- a/Network/Riak/Montage/Main.hs +++ b/Network/Riak/Montage/Main.hs @@ -51,17 +51,18 @@ cfg = Config { -- | Create a pool of Riak connection (usually used for constructing the second argument of @runDaemon@), given a port and a max number of connections. riakPoolOnPort :: String -> Int -> IO (Pool Connection) -riakPoolOnPort port' count = riakPoolOnPort' port' count 0 +riakPoolOnPort port' count = riakPoolOnPort' port' count 0 0 -- | Use tracking as the number of seconds delay before calculating pool resource and stm stats. Stats written to stderr. -riakPoolOnPort' :: String -> Int -> Int -> IO (Pool Connection) -riakPoolOnPort' port' count tracking = +riakPoolOnPort' :: String -> Int -> Int -> Int -> IO (Pool Connection) +riakPoolOnPort' port' count tracking reuse = createPool' (connect $ defaultClient {port = port'}) disconnect 1 -- stripes 10 -- timeout tracking -- tracking turned on if non zero + reuse -- max times to reuse a connection count -- max connections -- | Start the resolution proxy, where you define resolutions for your data @a@, and create one or more Riak connection pools @p@. diff --git a/montage.cabal b/montage.cabal index 212fa4a..6d50230 100644 --- a/montage.cabal +++ b/montage.cabal @@ -1,5 +1,5 @@ Name: montage -Version: 0.3.3 +Version: 0.3.4 Synopsis: Riak Resolution Proxy Homepage: http://github.com/bumptech/montage License: BSD3 @@ -51,7 +51,7 @@ Library ListLike, stm>=2.2, riak-bump>=0.7.3.6, - bump-resource-pool==0.2.1.2, + bump-resource-pool==0.2.1.3, unordered-containers >= 0.2.1 && < 0.3, zeromq-haskell>=0.8 && < 0.9, protocol-buffers >= 2.0.11,