Skip to content

Commit

Permalink
=str Fold InHandler and OutHandler for operator Scan.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 17, 2023
1 parent a7a49bd commit 0eed6a1
Showing 1 changed file with 8 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,17 +424,10 @@ private[stream] object Collect {
import shape.{ in, out }

// Initial behavior makes sure that the zero gets flushed if upstream is empty
setHandler(out,
new OutHandler {
override def onPull(): Unit = {
push(out, aggregator)
setHandlers(in, out, self)
}
})

setHandler(
setHandlers(
in,
new InHandler {
out,
new InHandler with OutHandler {
override def onPush(): Unit = ()

override def onUpstreamFinish(): Unit =
Expand All @@ -445,6 +438,11 @@ private[stream] object Collect {
completeStage()
}
})

override def onPull(): Unit = {
push(out, aggregator)
setHandlers(in, out, self)
}
})

override def onPull(): Unit = pull(in)
Expand Down

0 comments on commit 0eed6a1

Please sign in to comment.