Skip to content
This repository has been archived by the owner on May 27, 2022. It is now read-only.

Simplify thread management with a bounded thread count #25

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Network/Riak/Montage/Backend.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ maxRetries = 3

-- We can't actually put in the type signatures below, see
-- http://www.haskell.org/haskellwiki/Type_families#Injectivity.2C_type_inference.2C_and_ambiguity
retryOperation :: IO a -> IO a
retryOperation :: IO (Either SomeException a) -> IO a
retryOperation op =
retryOperation' 0
where
--retryOperation' :: IO a -> Int -> IO a
retryOperation' retries = catch op (handleError retries)
retryOperation' retries = do
res <- op
case res of
Left (e :: SomeException) -> handleError retries e
Right res' -> return res'

--handleError :: (Exception e) => Int -> e -> IO a
handleError retries e = case retries > maxRetries of
Expand Down
28 changes: 19 additions & 9 deletions Network/Riak/Montage/Main.hs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
module Network.Riak.Montage.Main where

import Prelude hiding (log)
import System.IO (hSetBuffering, BufferMode(..), stdout, stderr)
import Control.Monad (forever, void)
import Control.Concurrent (forkIO, threadDelay)
import qualified Data.Text as T
import Control.Concurrent (newEmptyMVar)
import qualified Data.ByteString.Lazy.Char8 as B

import Network.StatsWeb (initStats, addCounter, incCounter, runStats, Stats)
import Network.StatsWeb (initStats, incCounter, runStats, Stats)

import Data.Pool (Pool, createPool')
import Network.Riak (defaultClient, connect, disconnect,
Client(port), Connection)

import Network.Riak.Montage.Protocol
import Network.Riak.Montage.Process (newEmptyConcurrentState, generateRequest)
import Network.Riak.Montage.Process (generateRequest)
import Network.Riak.Montage.Types
import Network.Riak.Montage.Util

Expand Down Expand Up @@ -42,25 +43,26 @@ cfg = Config {
, statsPrefix = "montage"
, statsPort = 3334
, generator = generateRequest
, maxRequests = 700
, maxRequests = 100
, requestTimeout = 30
, readOnly = False
, logCommands = False
}

-- | Create a pool of Riak connection (usually used for constructing the second argument of @runDaemon@), given a port and a max number of connections.
riakPoolOnPort :: String -> Int -> IO (Pool Connection)
riakPoolOnPort port' count = riakPoolOnPort' port' count 0
riakPoolOnPort port' count = riakPoolOnPort' port' count 0 0

-- | Use tracking as the number of seconds delay before calculating pool resource and stm stats. Stats written to stderr.
riakPoolOnPort' :: String -> Int -> Int -> IO (Pool Connection)
riakPoolOnPort' port' count tracking =
riakPoolOnPort' :: String -> Int -> Int -> Int -> IO (Pool Connection)
riakPoolOnPort' port' count tracking reuse =
createPool'
(connect $ defaultClient {port = port'})
disconnect
1 -- stripes
10 -- timeout
tracking -- tracking turned on if non zero
reuse -- max times to reuse a connection
count -- max connections

-- | Start the resolution proxy, where you define resolutions for your data @a@, and create one or more Riak connection pools @p@.
Expand All @@ -71,17 +73,25 @@ runDaemon cfg' pools = do

stats <- initStats (statsPrefix cfg')

state <- newEmptyConcurrentState
q <- newEmptyMVar

let chooser' = chooser pools
let logging = logger cfg'
let runOn = "tcp://*:" ++ show (proxyPort cfg')

void $ forkIO $ loggedSupervise logging "network-zeromq" $ serveMontageZmq (generator cfg') runOn state logging chooser' stats (maxRequests cfg') (requestTimeout cfg') (readOnly cfg') (logCommands cfg')
let loop = processLoop q (generator cfg') logging chooser' stats (requestTimeout cfg') (readOnly cfg') (logCommands cfg')
mapM_ (makeChild logging loop) [1..(maxRequests cfg')]

void $ forkIO $ loggedSupervise logging "network-zeromq" $ serveMontageZmq q runOn stats
void $ forkIO $ loggedSupervise logging "timekeeper" $ timeKeeper stats
void $ forkIO $ runStats stats (statsPort cfg')
sleepForever

makeChild :: LogCallback -> IO () -> Int -> IO ()
makeChild log loop n = void $ forkIO $ loggedSupervise log label $ loop
where
label = "node-" ++ show n

timeKeeper :: Stats -> IO a
timeKeeper stats = forever $ do
logError "TICK"
Expand Down
107 changes: 4 additions & 103 deletions Network/Riak/Montage/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,11 @@ module Network.Riak.Montage.Process where
import Control.Monad (void, when)
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar, MVar)
import Control.Concurrent.STM (newTVarIO)
import Control.Concurrent.STM.TVar (readTVar, writeTVar, TVar)
import Control.Concurrent.STM.TMVar (newEmptyTMVar, readTMVar, putTMVar, TMVar)
import Control.Concurrent.STM.Stats (trackNamedSTM)
import Control.Exception (finally, try, throw, SomeException)
import Control.Exception (try, throw, SomeException)
import Text.ProtocolBuffers.WireMessage (messageGet, messagePut)
import System.Timeout (timeout)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Maybe (fromJust)

import qualified Data.HashMap.Strict as HM
import Control.Applicative

import qualified Network.Riak.Types as RT
import Text.ProtocolBuffers.Basic (toUtf8, utf8)
import qualified Data.Text.Encoding as E
Expand Down Expand Up @@ -46,56 +38,6 @@ import qualified Network.Riak.Montage.Proto.Montage.MontageCommand as MC
import qualified Network.Riak.Montage.Proto.Montage.MontageCommandResponse as MCR
import qualified Network.Riak.Montage.Proto.Montage.MontageDelete as MD

-- how many requests before printing stats?
statsEvery :: Int
statsEvery = 100

data ConcurrentState = ConcurrentState {
concurrentCount :: TVar Int
, tick :: TVar Int
, ts :: TVar Double
, pipeline :: TVar (HM.HashMap (RT.Bucket, RT.Key) (TMVar (Either SomeException CommandResponse)))
}

newEmptyConcurrentState :: IO ConcurrentState
newEmptyConcurrentState = ConcurrentState <$> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO HM.empty

pipelineGet :: (MontageRiakValue t) => ConcurrentState -> ChainCommand t
-> (Int -> IO CommandResponse -> IO CommandResponse)
-> Int -> IO CommandResponse -> IO CommandResponse
pipelineGet state (ChainGet buck key Nothing) tracker requestTimeout' actuallyRun = do
opt <- eitherAnswerOrMandate
mans <- case opt of
Left tmv -> do
mans <- try $ tracker requestTimeout' actuallyRun
trackNamedSTM "non-pipelined" $ do
putTMVar tmv mans
hash <- readTVar (pipeline state)
let hash' = HM.delete hashkey hash
writeTVar (pipeline state) hash'
return mans
Right tmv -> do
logError $ "(key request for " ++ (show buck) ++ "/" ++ (show key) ++ " is pipelined)"
runWithTimeout requestTimeout' $ trackNamedSTM "pipelined" $ readTMVar tmv

case mans of
Left (e::SomeException) -> throw e
Right ans -> return ans
where
eitherAnswerOrMandate = trackNamedSTM "eitherAnswerOrMandate" $ do
hash <- readTVar (pipeline state)
case HM.lookup hashkey hash of
Just tmv -> return $ Right tmv
Nothing -> do
newTmv <- newEmptyTMVar
let hash' = HM.insert hashkey newTmv hash
writeTVar (pipeline state) hash'
return $ Left newTmv

hashkey = (buck, key)

pipelineGet _ _ tracker requestTimeout' actuallyRun = tracker requestTimeout' actuallyRun

runWithTimeout :: Int -> IO a -> IO a
runWithTimeout requestTimeout' action = do
mr <- timeout requestTimeout action
Expand All @@ -107,45 +49,6 @@ runWithTimeout requestTimeout' action = do
where
requestTimeout = requestTimeout' * 1000000

trackConcurrency :: ConcurrentState -> Int -> Int -> IO CommandResponse
-> IO CommandResponse
trackConcurrency state maxRequests' requestTimeout' action = do
mcount <- maybeIncrCount
case mcount of
Just count -> do
logState count
finally (runWithTimeout requestTimeout' action) decrCount
Nothing -> error "concurrency limit hit"
where
maybeIncrCount = trackNamedSTM "maybeIncCount" $ do
count <- readTVar (concurrentCount state)
if (count < maxRequests')
then (writeTVar (concurrentCount state) (count + 1) >> return (Just $ count + 1))
else (return Nothing)

decrCount = trackNamedSTM "decrCount" $ do
count <- readTVar (concurrentCount state)
writeTVar (concurrentCount state) $ count - 1

logState count = do
now <- fmap realToFrac getPOSIXTime
mlog <- trackNamedSTM "logState" $ do
tick' <- fmap (+1) $ readTVar (tick state)
writeTVar (tick state) tick'
if tick' `mod` statsEvery == 0
then do
last' <- readTVar (ts state)
writeTVar (ts state) now
return (Just last')
else (return Nothing)
case mlog of
Just last' -> do
let speed = (fromIntegral statsEvery) / (now - last') -- should never be /0
logError ("{stats} concurrency=" ++ (show count)
++ " rate=" ++ (show speed))
--dumpSTMStats
Nothing -> return ()

fromRight :: Either a b -> b
fromRight (Right x) = x
fromRight (Left _) = error "fromRight got Left!"
Expand Down Expand Up @@ -224,17 +127,15 @@ statsChainCustom :: (MontageRiakValue r) => ChainCommand r -> Stats -> IO ()
statsChainCustom (ChainCustom cmd _) stats = incCounter (TL.toStrict $ format "requests.custom[type={}]" (Only cmd)) stats
statsChainCustom _ _ = return ()

processRequest :: (MontageRiakValue r) => ConcurrentState -> PoolChooser -> ChainCommand r -> Stats -> Int -> Int -> Bool -> Bool -> IO CommandResponse
processRequest state chooser' cmd stats maxRequests' requestTimeout' readOnly' logCommands' = do
processRequest :: (MontageRiakValue r) => PoolChooser -> ChainCommand r -> Stats -> Int -> Bool -> Bool -> IO CommandResponse
processRequest chooser' cmd stats requestTimeout' readOnly' logCommands' = do
when (readOnly' && (not $ isRead cmd)) $
error "Non-read request issued to read-only montage"

when (logCommands') $
logError $ "Running command " ++ show cmd

pipelineGet state cmd tracker requestTimeout' (processRequest' chooser' cmd stats)
where
tracker = trackConcurrency state maxRequests'
runWithTimeout requestTimeout' (processRequest' chooser' cmd stats)

processRequest' :: (MontageRiakValue r) => PoolChooser -> ChainCommand r -> Stats -> IO CommandResponse
processRequest' chooser' cmd stats = do
Expand Down
51 changes: 31 additions & 20 deletions Network/Riak/Montage/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Network.Riak.Montage.Protocol where
import System.ZMQ
import System.UUID.V4 (uuid)
import Control.Monad (forever)
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, takeMVar, putMVar)
import qualified Data.ByteString.Lazy.Char8 as B
import qualified Data.ByteString.Lazy as BW
import qualified Data.ByteString.Char8 as S
Expand All @@ -12,19 +12,20 @@ import Text.ProtocolBuffers.WireMessage (messageGet, messagePut)
import Text.ProtocolBuffers.Basic (uFromString)
import Data.Aeson (object, (.=))

import Network.Riak.Montage.Util

import Network.StatsWeb (Stats)
import Network.StatsWeb (Stats, incCounter)

import Network.Riak.Montage.Util
import Network.Riak.Montage.Types
import Network.Riak.Montage.Proto.Montage.MontageEnvelope as ME
import Network.Riak.Montage.Proto.Montage.MontageWireMessages
import Network.Riak.Montage.Proto.Montage.MontageError
import Network.Riak.Montage.Types
import Network.Riak.Montage.Process (processRequest,
serializeResponse, ConcurrentState(..))
serializeResponse)


type ZmqHandler = (S.ByteString -> ZmqCallback -> IO ())

type ZmqHandler = (S.ByteString -> (BW.ByteString -> IO ()) -> IO ())
type ZmqCallback = (BW.ByteString -> IO ())

runZmqRpc :: String
-> ZmqHandler
Expand All @@ -37,7 +38,7 @@ runZmqRpcWithContext :: Context
-> String
-> ZmqHandler
-> IO ()
runZmqRpcWithContext ctx binda call = do
runZmqRpcWithContext ctx binda serve = do
withSocket ctx Router (\s -> do
rand <- uuid
let inproc = "inproc://" ++ (show rand)
Expand All @@ -55,8 +56,7 @@ runZmqRpcWithContext ctx binda call = do
send s "" [SndMore]
send s m []
else do -- call
_ <- forkIO $ call m (zmqRpcReply ctx inproc zid)
return ()
serve m (zmqRpcReply ctx inproc zid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jamwt can you comment on why you originally designed this function to return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as to why you think that change matters. @jamwt's version is semantically equivalent to void $ forkIO. Perhaps the first version of montage was trying to avoid "last statement in a 'do' block must be an expression" issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your new version doesn't forkIO though. It seems unnecessary to do it (since we're calling forkIO at the top level) but I was curious why it was done originally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now understand (from IRC).

)

zmqRpcReply :: Context
Expand All @@ -71,29 +71,40 @@ zmqRpcReply c inproc retid out = do
send s retid []
)

serveMontageZmq :: (MontageRiakValue r) =>
(MontageEnvelope -> ChainCommand r) ->
String -> ConcurrentState -> LogCallback ->
PoolChooser -> Stats -> Int -> Int -> Bool -> Bool -> IO ()
serveMontageZmq generate runOn state logCB chooser' stats maxRequests' requestTimeout' readOnly' logCommands' = do
runZmqRpc runOn wrapMontage
serveMontageZmq :: MVar (S.ByteString, ZmqCallback) -> String -> Stats -> IO ()
serveMontageZmq queueAny runOn stats = do
runZmqRpc runOn serveMontage
where
serveMontage m cb = do
putMVar queueAny (m, cb)
incCounter "requests" stats

processLoop :: (MontageRiakValue r) =>
MVar (S.ByteString, ZmqCallback) ->
(MontageEnvelope -> ChainCommand r) ->
LogCallback -> PoolChooser -> Stats ->
Int -> Bool -> Bool -> IO ()
processLoop queueAny generate logCB chooser' stats requestTimeout' readOnly' logCommands' = do
forever $ do
(req, cb) <- takeMVar queueAny
wrapMontage req >>= cb
where
wrapMontage m cb = do
wrapMontage m = do
case messageGet $ sTl m of
Right (env, x) | B.length x == 0 -> do
res <- try $ do
let !cmd = generate env
fmap (serializeResponse env) $ processRequest state chooser' cmd stats maxRequests' requestTimeout' readOnly' logCommands'
fmap (serializeResponse env) $ processRequest chooser' cmd stats requestTimeout' readOnly' logCommands'
case res of
Left (e :: SomeException) -> returnError (show e) $ msgid env
Right outenv -> cb $ messagePut outenv
Right outenv -> return . messagePut $ outenv

_ -> returnError "Failed to decode MontageEnvelope" Nothing
where
returnError err msgid' = do
logError err
logCB "EXCEPTION" Nothing $ object ["error" .= err ]
cb $ messagePut $ MontageEnvelope {
return . messagePut $ MontageEnvelope {
mtype = MONTAGE_ERROR
, msg = messagePut $ MontageError (uFromString err)
, msgid = msgid'
Expand Down
4 changes: 2 additions & 2 deletions montage.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Name: montage
Version: 0.2.9
Version: 0.3.4
Synopsis: Riak Resolution Proxy
Homepage: http://github.com/bumptech/montage
License: BSD3
Expand Down Expand Up @@ -51,7 +51,7 @@ Library
ListLike,
stm>=2.2,
riak-bump>=0.7.3.6,
bump-resource-pool,
bump-resource-pool==0.2.1.3,
unordered-containers >= 0.2.1 && < 0.3,
zeromq-haskell>=0.8 && < 0.9,
protocol-buffers >= 2.0.11,
Expand Down