Skip to content
This repository has been archived by the owner. It is now read-only.

Commit

Permalink
Fix 'Input buffer overrun' exception on static files larger than 128KB
Browse files Browse the repository at this point in the history
  • Loading branch information
fomkin committed May 21, 2020
1 parent 3ecb9da commit ff449e3
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 7 deletions.
2 changes: 1 addition & 1 deletion effect/src/main/scala/korolev/effect/io/FileIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object FileIO {

def readBytes[F[_]: Effect](path: Path): F[LazyBytes[F]] = {
val inputStream = new FileInputStream(path.toFile)
LazyBytes(inputStream)
LazyBytes.fromInputStream(inputStream)
}

def readLines[F[_]: Effect](path: Path): F[Stream[F, String]] = {
Expand Down
2 changes: 1 addition & 1 deletion effect/src/main/scala/korolev/effect/io/LazyBytes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object LazyBytes {
new LazyBytes(Stream.eval(bytes), Some(bytes.length.toLong))
}

def apply[F[_] : Effect](inputStream: InputStream, chunkSize: Int = 8192): F[LazyBytes[F]] = {
def fromInputStream[F[_] : Effect](inputStream: InputStream, chunkSize: Int = 8192): F[LazyBytes[F]] = {
@tailrec
def readStream(chunk: Array[Byte], offset: Int, len: Int): (Unit, Option[Array[Byte]]) = {
val read = inputStream.read(chunk, offset, len)
Expand Down
17 changes: 17 additions & 0 deletions effect/src/test/scala/korolev/effect/io/LazyBytesSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package korolev.effect.io

import java.io.ByteArrayInputStream

import org.scalatest.{AsyncFlatSpec, Matchers}

class LazyBytesSpec extends AsyncFlatSpec with Matchers {

final val inputStream1Length = 239978
final val inputStream1 = new ByteArrayInputStream(Array.fill[Byte](inputStream1Length)(1))

"LazyBytes.fromInputStream" should "return exactly same bytes as contains in InputStream" in {
LazyBytes.fromInputStream(inputStream1).flatMap(_.toStrict).map { bytes =>
bytes.length shouldEqual inputStream1Length
}
}
}
5 changes: 4 additions & 1 deletion interop/akka/src/main/scala/korolev/akka/Converters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package korolev.akka

import akka.NotUsed
import akka.stream.{Attributes, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import korolev.akka.util.{KorolevStreamPublisher, KorolevStreamSubscriber}
import korolev.effect.{Effect, Stream}
Expand Down Expand Up @@ -59,7 +60,9 @@ object Converters {

def asAkkaSource: Source[T, NotUsed] = {
val publisher = new KorolevStreamPublisher(stream, fanout = false)
Source.fromPublisher(publisher)
Source
.fromPublisher(publisher)
.buffer(10, OverflowStrategy.backpressure) // FIXME should work without this line. Looks like bug in akka-streams
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ final class KorolevStreamPublisher[F[_] : Effect, T](stream: Stream[F, T],
@volatile var pending: Effect.Promise[Unit] = _
val res = Right(())

def value(): F[Long] = Effect[F].delay(n)

def decOrLock(): F[Unit] = Effect[F].promise[Unit] { cb =>
this.synchronized { // FIXME use CAS?
if (n == 0) {
Expand All @@ -49,7 +51,7 @@ final class KorolevStreamPublisher[F[_] : Effect, T](stream: Stream[F, T],
}
}

def setUnsafe(x: Long): Unit =
def unsafeSet(x: Long): Unit =
this.synchronized {
n = x
if (x > 0 && pending != null) {
Expand Down Expand Up @@ -79,10 +81,13 @@ final class KorolevStreamPublisher[F[_] : Effect, T](stream: Stream[F, T],
}
} yield ()

loop().runAsyncForget
loop().runAsync {
case Left(error) => subscriber.onError(error)
case Right(_) => ()
}

def request(n: Long): Unit = {
counter.setUnsafe(n)
counter.unsafeSet(n)
}

def cancel(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[korolev] final class FilesService[F[_]: Effect](commonService: CommonSer
case Some(mimeType) => Seq(Headers.ContentType -> mimeType)
case None => Nil
}
LazyBytes(javaSyncStream) map { lazyBytes =>
LazyBytes.fromInputStream(javaSyncStream) map { lazyBytes =>
Response(Response.Status.Ok, lazyBytes, headers)
}
}
Expand Down

0 comments on commit ff449e3

Please sign in to comment.