Skip to content

Commit

Permalink
Use a type family to provide the Delta type for incremental state.
Browse files Browse the repository at this point in the history
Saves passing around a bunch of parameters.
  • Loading branch information
dougalm committed Dec 15, 2023
1 parent 9c6ccba commit 535243c
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 96 deletions.
78 changes: 24 additions & 54 deletions src/lib/Actor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Actor (
ActorM, Actor (..), launchActor, send, selfMailbox, messageLoop,
sliceMailbox, SubscribeMsg (..), IncServer, IncServerT, FileWatcher,
StateServer, flushDiffs, handleSubscribeMsg, subscribe, subscribeIO, sendSync,
runIncServerT, launchFileWatcher, Mailbox, launchIncFunctionEvaluator
runIncServerT, launchFileWatcher, Mailbox,
) where

import Control.Concurrent
Expand All @@ -34,10 +34,10 @@ newtype ActorM msg a = ActorM { runActorM :: ReaderT (Chan msg) IO a }

newtype Mailbox a = Mailbox { sendToMailbox :: a -> IO () }

class (Show msg, MonadIO m) => Actor msg m | m -> msg where
class (MonadIO m) => Actor msg m | m -> msg where
selfChan :: m (Chan msg)

instance Show msg => Actor msg (ActorM msg) where
instance Actor msg (ActorM msg) where
selfChan = ActorM ask

instance Actor msg m => Actor msg (ReaderT r m) where selfChan = lift $ selfChan
Expand Down Expand Up @@ -94,104 +94,74 @@ sendSync mailbox msg = do

-- === Diff server ===

data IncServerState s d = IncServerState
{ subscribers :: [Mailbox d]
, bufferedUpdates :: d
data IncServerState s = IncServerState
{ subscribers :: [Mailbox (Delta s)]
, bufferedUpdates :: Delta s
, curIncState :: s }
deriving (Show, Generic)
deriving (Generic)

class (Monoid d, MonadIO m) => IncServer s d m | m -> s, m -> d where
getIncServerStateRef :: m (IORef (IncServerState s d))
class (IncState s, MonadIO m) => IncServer s m | m -> s where
getIncServerStateRef :: m (IORef (IncServerState s))

data SubscribeMsg s d = Subscribe (SyncMsg (Mailbox d) s) deriving (Show)
data SubscribeMsg s = Subscribe (SyncMsg (Mailbox (Delta s)) s) deriving (Show)

getIncServerState :: IncServer s d m => m (IncServerState s d)
getIncServerState :: IncServer s m => m (IncServerState s)
getIncServerState = readRef =<< getIncServerStateRef

updateIncServerState :: IncServer s d m => (IncServerState s d -> IncServerState s d) -> m ()
updateIncServerState :: IncServer s m => (IncServerState s -> IncServerState s) -> m ()
updateIncServerState f = do
ref <- getIncServerStateRef
prev <- readRef ref
writeRef ref $ f prev

handleSubscribeMsg :: IncServer s d m => SubscribeMsg s d -> m ()
handleSubscribeMsg :: IncServer s m => SubscribeMsg s -> m ()
handleSubscribeMsg (Subscribe (SyncMsg newSub response)) = do
flushDiffs
updateIncServerState \s -> s { subscribers = newSub : subscribers s }
curState <- curIncState <$> getIncServerState
setPromise response curState

flushDiffs :: IncServer s d m => m ()
flushDiffs :: IncServer s m => m ()
flushDiffs = do
d <- bufferedUpdates <$> getIncServerState
updateIncServerState \s -> s { bufferedUpdates = mempty }
subs <- subscribers <$> getIncServerState
-- TODO: consider testing for emptiness here
forM_ subs \sub -> send sub d

type StateServer s d = Mailbox (SubscribeMsg s d)
type StateServer s = Mailbox (SubscribeMsg s)

subscribe :: Actor msg m => (d -> msg) -> StateServer s d -> m s
subscribe :: (IncState s, Actor msg m) => (Delta s -> msg) -> StateServer s -> m s
subscribe inject server = do
updateChannel <- selfMailbox inject
sendSync (sliceMailbox Subscribe server) updateChannel

subscribeIO :: StateServer s d -> IO (s, Chan d)
subscribeIO :: IncState s => StateServer s -> IO (s, Chan (Delta s))
subscribeIO server = do
chan <- newChan
let mailbox = Mailbox (writeChan chan)
s <- sendSync (sliceMailbox Subscribe server) mailbox
return (s, chan)

newtype IncServerT s d m a = IncServerT { runIncServerT' :: ReaderT (Ref (IncServerState s d)) m a }
newtype IncServerT s m a = IncServerT { runIncServerT' :: ReaderT (Ref (IncServerState s)) m a }
deriving (Functor, Applicative, Monad, MonadIO, Actor msg, FreshNames name, MonadTrans)

instance (MonadIO m, IncState s d) => IncServer s d (IncServerT s d m) where
instance (MonadIO m, IncState s) => IncServer s (IncServerT s m) where
getIncServerStateRef = IncServerT ask

instance (MonadIO m, IncState s d) => DefuncState d (IncServerT s d m) where
instance (MonadIO m, IncState s, d ~ Delta s) => DefuncState d (IncServerT s m) where
update d = updateIncServerState \s -> s
{ bufferedUpdates = bufferedUpdates s <> d
, curIncState = curIncState s `applyDiff` d}

instance (MonadIO m, IncState s d) => LabelReader (SingletonLabel s) (IncServerT s d m) where
instance (MonadIO m, IncState s) => LabelReader (SingletonLabel s) (IncServerT s m) where
getl It = curIncState <$> getIncServerState

runIncServerT :: (MonadIO m, IncState s d) => s -> IncServerT s d m a -> m a
runIncServerT :: (MonadIO m, IncState s) => s -> IncServerT s m a -> m a
runIncServerT s cont = do
ref <- newRef $ IncServerState [] mempty s
runReaderT (runIncServerT' cont) ref

-- === Incremental function server ===

-- If you just need something that computes a function incrementally and doesn't
-- need to maintain any other state then this will do.

data IncFunctionEvaluatorMsg da b db =
Subscribe_IFEM (SubscribeMsg b db)
| Update_IFEM da
deriving (Show)

launchIncFunctionEvaluator
:: (IncState b db, Show da, MonadIO m)
=> StateServer a da
-> (a -> (b,s))
-> (b -> s -> da -> (db, s))
-> m (StateServer b db)
launchIncFunctionEvaluator server fInit fUpdate =
sliceMailbox Subscribe_IFEM <$> launchActor do
x0 <- subscribe Update_IFEM server
let (y0, s0) = fInit x0
flip evalStateT s0 $ runIncServerT y0 $ messageLoop \case
Subscribe_IFEM msg -> handleSubscribeMsg msg
Update_IFEM dx -> do
y <- getl It
s <- lift get
let (dy, s') = fUpdate y s dx
lift $ put s'
update dy
flushDiffs

-- === Refs ===
-- Just a wrapper around IORef lifted to `MonadIO`

Expand All @@ -218,14 +188,14 @@ launchClock intervalMicroseconds mailbox =
-- === File watcher ===

type SourceFileContents = Text
type FileWatcher = StateServer (Overwritable SourceFileContents) (Overwrite SourceFileContents)
type FileWatcher = StateServer (Overwritable SourceFileContents)

readFileContents :: MonadIO m => FilePath -> m Text
readFileContents path = liftIO $ T.decodeUtf8 <$> BS.readFile path

data FileWatcherMsg =
ClockSignal_FW ()
| Subscribe_FW (SubscribeMsg (Overwritable Text) (Overwrite Text))
| Subscribe_FW (SubscribeMsg (Overwritable Text))
deriving (Show)

launchFileWatcher :: MonadIO m => FilePath -> m FileWatcher
Expand Down
44 changes: 25 additions & 19 deletions src/lib/IncState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,37 @@ import Data.Aeson (ToJSON (..))
import qualified Data.Map.Strict as M
import GHC.Generics

-- === IncState ===
-- === Delta type family ===

class Monoid d => IncState s d where
applyDiff :: s -> d -> s
class Monoid (Delta s) => IncState s where
type Delta s :: *
applyDiff :: s -> Delta s -> s

-- === Diff utils ===

data MapEltUpdate s d =
data MapEltUpdate s =
Create s
| Replace s -- TODO: should we merge Create/Replace?
| Update d
| Update (Delta s)
| Delete
deriving (Eq, Functor, Show, Generic)
deriving (Generic)

newtype MapUpdate k s d = MapUpdate { mapUpdates :: M.Map k (MapEltUpdate s d) }
deriving (Functor, Show, Generic)
newtype MapUpdate k s = MapUpdate { mapUpdates :: M.Map k (MapEltUpdate s) }

mapUpdateMapWithKey :: MapUpdate k s d -> (k -> s -> s') -> (k -> d -> d') -> MapUpdate k s' d'
mapUpdateMapWithKey
:: (IncState s, IncState s')
=> MapUpdate k s -> (k -> s -> s') -> (k -> Delta s -> Delta s') -> MapUpdate k s'
mapUpdateMapWithKey (MapUpdate m) fs fd =
MapUpdate $ flip M.mapWithKey m \k v -> case v of
Create s -> Create $ fs k s
Replace s -> Replace $ fs k s
Update d -> Update $ fd k d
Delete -> Delete

instance (IncState s d, Ord k) => Monoid (MapUpdate k s d) where
instance (IncState s, Ord k) => Monoid (MapUpdate k s) where
mempty = MapUpdate mempty

instance (IncState s d, Ord k) => Semigroup (MapUpdate k s d) where
instance (IncState s, Ord k) => Semigroup (MapUpdate k s) where
MapUpdate m1 <> MapUpdate m2 = MapUpdate $
M.mapMaybe id (M.intersectionWith combineElts m1 m2)
<> M.difference m1 m2
Expand All @@ -70,7 +72,8 @@ instance (IncState s d, Ord k) => Semigroup (MapUpdate k s d) where
Update _ -> error "shouldn't be updating a node that doesn't exist"
Delete -> error "shouldn't be deleting a node that doesn't exist"

instance (IncState s d, Ord k) => IncState (M.Map k s) (MapUpdate k s d) where
instance (IncState s, Ord k) => IncState (M.Map k s) where
type Delta (M.Map k s) = MapUpdate k s
applyDiff m (MapUpdate updates) =
M.mapMaybe id (M.intersectionWith applyEltUpdate m updates)
<> M.difference m updates
Expand Down Expand Up @@ -101,7 +104,8 @@ instance Semigroup (TailUpdate a) where
instance Monoid (TailUpdate a) where
mempty = TailUpdate 0 []

instance IncState [a] (TailUpdate a) where
instance IncState [a] where
type Delta [a] = TailUpdate a
applyDiff xs (TailUpdate numDrop ys) = take (length xs - numDrop) xs <> ys

-- Trivial diff that works for any type - just replace the old value with a completely new one.
Expand All @@ -117,26 +121,28 @@ instance Semigroup (Overwrite a) where
instance Monoid (Overwrite a) where
mempty = NoChange

instance IncState (Overwritable a) (Overwrite a) where
instance IncState (Overwritable a) where
type Delta (Overwritable a) = Overwrite a
applyDiff s = \case
NoChange -> s
OverwriteWith s' -> Overwritable s'

-- Case when the diff and the state are the same
newtype MonoidState a = MonoidState a

instance Monoid a => IncState (MonoidState a) a where
instance Monoid a => IncState (MonoidState a) where
type Delta (MonoidState a) = a
applyDiff (MonoidState d) d' = MonoidState $ d <> d'


-- Trivial diff that works for any type - just replace the old value with a completely new one.
newtype Unchanging a = Unchanging { fromUnchanging :: a } deriving (Show, Eq, Ord)

instance IncState (Unchanging a) () where
instance IncState (Unchanging a) where
type Delta (Unchanging a) = ()
applyDiff s () = s

instance ToJSON a => ToJSON (Overwrite a)
instance (ToJSON k, ToJSON s, ToJSON d) => ToJSON (MapUpdate k s d) where
instance (ToJSON k, ToJSON s, ToJSON (Delta s)) => ToJSON (MapUpdate k s) where
toJSON m = toJSON $ M.toList $ mapUpdates m
instance ToJSON a => ToJSON (TailUpdate a)
instance (ToJSON s, ToJSON d) => ToJSON (MapEltUpdate s d)
instance (ToJSON s, ToJSON (Delta s)) => ToJSON (MapEltUpdate s)
43 changes: 22 additions & 21 deletions src/lib/Live/Eval.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import RenderHtml

-- === Top-level interface ===

type EvalServer = StateServer CellsState CellsUpdate
type EvalServer = StateServer CellsState

-- `watchAndEvalFile` returns the channel by which a client may
-- subscribe by sending a write-only view of its input channel.
Expand Down Expand Up @@ -63,25 +63,26 @@ data NodeList a = NodeList
, nodeMap :: M.Map NodeId a }
deriving (Show, Generic, Functor)

data NodeListUpdate s d = NodeListUpdate
data NodeListUpdate s = NodeListUpdate
{ orderedNodesUpdate :: TailUpdate NodeId
, nodeMapUpdate :: MapUpdate NodeId s d }
deriving (Show, Generic)
, nodeMapUpdate :: MapUpdate NodeId s }
deriving (Generic)

instance IncState s d => Semigroup (NodeListUpdate s d) where
instance IncState s => Semigroup (NodeListUpdate s) where
NodeListUpdate x1 y1 <> NodeListUpdate x2 y2 = NodeListUpdate (x1<>x2) (y1<>y2)

instance IncState s d => Monoid (NodeListUpdate s d) where
instance IncState s => Monoid (NodeListUpdate s) where
mempty = NodeListUpdate mempty mempty

instance IncState s d => IncState (NodeList s) (NodeListUpdate s d) where
instance IncState s => IncState (NodeList s) where
type Delta (NodeList s) = NodeListUpdate s
applyDiff (NodeList m xs) (NodeListUpdate dm dxs) =
NodeList (applyDiff m dm) (applyDiff xs dxs)

type Dag a = NodeList (Unchanging a)
type DagUpdate a = NodeListUpdate (Unchanging a) ()
type DagUpdate a = NodeListUpdate (Unchanging a)

nodeListAsUpdate :: NodeList s -> NodeListUpdate s d
nodeListAsUpdate :: NodeList s -> NodeListUpdate s
nodeListAsUpdate (NodeList xs m)= NodeListUpdate (TailUpdate 0 xs) (MapUpdate $ fmap Create m)

emptyNodeList :: NodeList a
Expand All @@ -101,7 +102,7 @@ commonPrefixLength _ _ = 0
nodeListVals :: NodeList a -> [a]
nodeListVals nodes = orderedNodes nodes <&> \k -> fromJust $ M.lookup k (nodeMap nodes)

computeNodeListUpdate :: (Eq s, FreshNames NodeId m) => NodeList s -> [s] -> m (NodeListUpdate s d)
computeNodeListUpdate :: (Eq s, FreshNames NodeId m) => NodeList s -> [s] -> m (NodeListUpdate s)
computeNodeListUpdate nodes newVals = do
let prefixLength = commonPrefixLength (nodeListVals nodes) newVals
let oldTail = drop prefixLength $ orderedNodes nodes
Expand All @@ -114,10 +115,10 @@ computeNodeListUpdate nodes newVals = do
-- This coarsely parses the full file into blocks and forms a DAG (for now a
-- trivial one assuming all top-to-bottom dependencies) of the results.

type CellParser = StateServer (Dag SourceBlock) (DagUpdate SourceBlock)
type CellParser = StateServer (Dag SourceBlock)

data CellParserMsg =
Subscribe_CP (SubscribeMsg (Dag SourceBlock) (DagUpdate SourceBlock))
Subscribe_CP (SubscribeMsg (Dag SourceBlock))
| Update_CP (Overwrite Text)
deriving (Show)

Expand All @@ -143,22 +144,23 @@ cellParserImpl fileWatcher parseCells = runFreshNameT do
-- This is where we track the state of evaluation and decide what we needs to be
-- run and what needs to be killed.

type Evaluator = StateServer CellsState CellsUpdate
type Evaluator = StateServer CellsState
newtype EvaluatorM a =
EvaluatorM { runEvaluatorM' ::
IncServerT CellsState CellsUpdate
IncServerT CellsState
(StateT EvaluatorState
(ActorM EvaluatorMsg)) a }
deriving (Functor, Applicative, Monad, MonadIO, Actor (EvaluatorMsg))
deriving instance IncServer CellsState CellsUpdate EvaluatorM
deriving instance IncServer CellsState EvaluatorM

instance Semigroup CellUpdate where
CellUpdate s o <> CellUpdate s' o' = CellUpdate (s<>s') (o<>o')

instance Monoid CellUpdate where
mempty = CellUpdate mempty mempty

instance IncState CellState CellUpdate where
instance IncState CellState where
type Delta CellState = CellUpdate
applyDiff (CellState source status result) (CellUpdate status' result') =
CellState source (fromOverwritable (applyDiff (Overwritable status) status')) (result <> result')

Expand All @@ -182,7 +184,7 @@ instance LabelReader EvaluatorMLabel EvaluatorM where
EvalCfg -> EvaluatorM $ lift $ evaluatorCfg <$> get

data EvaluatorMUpdate =
UpdateDagEU (NodeListUpdate CellState CellUpdate)
UpdateDagEU (NodeListUpdate CellState)
| UpdateCellState NodeId CellUpdate
| UpdateCurJob CurJobStatus
| UpdateEnvs [TopStateEx]
Expand Down Expand Up @@ -219,7 +221,7 @@ data CellState = CellState SourceBlockWithId CellStatus Outputs
data CellUpdate = CellUpdate (Overwrite CellStatus) Outputs deriving (Show, Generic)

type CellsState = NodeList CellState
type CellsUpdate = NodeListUpdate CellState CellUpdate
type CellsUpdate = NodeListUpdate CellState

type CellIndex = Int -- index in the list of cells, not the NodeId

Expand All @@ -231,8 +233,7 @@ data JobUpdate =
data EvaluatorMsg =
SourceUpdate (DagUpdate SourceBlock)
| JobUpdate JobId JobUpdate
| Subscribe_E (SubscribeMsg CellsState CellsUpdate)
deriving (Show)
| Subscribe_E (SubscribeMsg CellsState)

initEvaluatorState :: EvalConfig -> TopStateEx -> EvaluatorState
initEvaluatorState cfg s = EvaluatorState cfg [s] Nothing
Expand Down Expand Up @@ -362,4 +363,4 @@ initCellState cellId source = do
instance ToJSON CellState where
instance ToJSON CellStatus
instance ToJSON CellUpdate
instance (ToJSON s, ToJSON d) => ToJSON (NodeListUpdate s d)
instance (IncState s, ToJSON s, ToJSON (Delta s)) => ToJSON (NodeListUpdate s)
Loading

0 comments on commit 535243c

Please sign in to comment.