diff --git a/effect/src/main/scala/korolev/effect/io/FileIO.scala b/effect/src/main/scala/korolev/effect/io/FileIO.scala index 7b279d86..aba86ad1 100644 --- a/effect/src/main/scala/korolev/effect/io/FileIO.scala +++ b/effect/src/main/scala/korolev/effect/io/FileIO.scala @@ -26,7 +26,7 @@ object FileIO { def readBytes[F[_]: Effect](path: Path): F[LazyBytes[F]] = { val inputStream = new FileInputStream(path.toFile) - LazyBytes(inputStream) + LazyBytes.fromInputStream(inputStream) } def readLines[F[_]: Effect](path: Path): F[Stream[F, String]] = { diff --git a/effect/src/main/scala/korolev/effect/io/LazyBytes.scala b/effect/src/main/scala/korolev/effect/io/LazyBytes.scala index 2150bfde..709b8902 100644 --- a/effect/src/main/scala/korolev/effect/io/LazyBytes.scala +++ b/effect/src/main/scala/korolev/effect/io/LazyBytes.scala @@ -70,7 +70,7 @@ object LazyBytes { new LazyBytes(Stream.eval(bytes), Some(bytes.length.toLong)) } - def apply[F[_] : Effect](inputStream: InputStream, chunkSize: Int = 8192): F[LazyBytes[F]] = { + def fromInputStream[F[_] : Effect](inputStream: InputStream, chunkSize: Int = 8192): F[LazyBytes[F]] = { @tailrec def readStream(chunk: Array[Byte], offset: Int, len: Int): (Unit, Option[Array[Byte]]) = { val read = inputStream.read(chunk, offset, len) diff --git a/effect/src/test/scala/korolev/effect/io/LazyBytesSpec.scala b/effect/src/test/scala/korolev/effect/io/LazyBytesSpec.scala new file mode 100644 index 00000000..91819ea6 --- /dev/null +++ b/effect/src/test/scala/korolev/effect/io/LazyBytesSpec.scala @@ -0,0 +1,17 @@ +package korolev.effect.io + +import java.io.ByteArrayInputStream + +import org.scalatest.{AsyncFlatSpec, Matchers} + +class LazyBytesSpec extends AsyncFlatSpec with Matchers { + + final val inputStream1Length = 239978 + final val inputStream1 = new ByteArrayInputStream(Array.fill[Byte](inputStream1Length)(1)) + + "LazyBytes.fromInputStream" should "return exactly same bytes as contains in InputStream" in { + LazyBytes.fromInputStream(inputStream1).flatMap(_.toStrict).map { bytes => + bytes.length shouldEqual inputStream1Length + } + } +} diff --git a/interop/akka/src/main/scala/korolev/akka/Converters.scala b/interop/akka/src/main/scala/korolev/akka/Converters.scala index e6f8d2da..342ead2d 100644 --- a/interop/akka/src/main/scala/korolev/akka/Converters.scala +++ b/interop/akka/src/main/scala/korolev/akka/Converters.scala @@ -17,6 +17,7 @@ package korolev.akka import akka.NotUsed +import akka.stream.{Attributes, OverflowStrategy} import akka.stream.scaladsl.{Sink, Source} import korolev.akka.util.{KorolevStreamPublisher, KorolevStreamSubscriber} import korolev.effect.{Effect, Stream} @@ -59,7 +60,9 @@ object Converters { def asAkkaSource: Source[T, NotUsed] = { val publisher = new KorolevStreamPublisher(stream, fanout = false) - Source.fromPublisher(publisher) + Source + .fromPublisher(publisher) + .buffer(10, OverflowStrategy.backpressure) // FIXME should work without this line. Looks like bug in akka-streams } } } diff --git a/interop/akka/src/main/scala/korolev/akka/util/KorolevStreamPublisher.scala b/interop/akka/src/main/scala/korolev/akka/util/KorolevStreamPublisher.scala index 4ecd7305..c051a57f 100644 --- a/interop/akka/src/main/scala/korolev/akka/util/KorolevStreamPublisher.scala +++ b/interop/akka/src/main/scala/korolev/akka/util/KorolevStreamPublisher.scala @@ -38,6 +38,8 @@ final class KorolevStreamPublisher[F[_] : Effect, T](stream: Stream[F, T], @volatile var pending: Effect.Promise[Unit] = _ val res = Right(()) + def value(): F[Long] = Effect[F].delay(n) + def decOrLock(): F[Unit] = Effect[F].promise[Unit] { cb => this.synchronized { // FIXME use CAS? if (n == 0) { @@ -49,7 +51,7 @@ final class KorolevStreamPublisher[F[_] : Effect, T](stream: Stream[F, T], } } - def setUnsafe(x: Long): Unit = + def unsafeSet(x: Long): Unit = this.synchronized { n = x if (x > 0 && pending != null) { @@ -79,10 +81,13 @@ final class KorolevStreamPublisher[F[_] : Effect, T](stream: Stream[F, T], } } yield () - loop().runAsyncForget + loop().runAsync { + case Left(error) => subscriber.onError(error) + case Right(_) => () + } def request(n: Long): Unit = { - counter.setUnsafe(n) + counter.unsafeSet(n) } def cancel(): Unit = { diff --git a/korolev/src/main/scala/korolev/server/internal/services/FilesService.scala b/korolev/src/main/scala/korolev/server/internal/services/FilesService.scala index f512114d..f12bba35 100644 --- a/korolev/src/main/scala/korolev/server/internal/services/FilesService.scala +++ b/korolev/src/main/scala/korolev/server/internal/services/FilesService.scala @@ -41,7 +41,7 @@ private[korolev] final class FilesService[F[_]: Effect](commonService: CommonSer case Some(mimeType) => Seq(Headers.ContentType -> mimeType) case None => Nil } - LazyBytes(javaSyncStream) map { lazyBytes => + LazyBytes.fromInputStream(javaSyncStream) map { lazyBytes => Response(Response.Status.Ok, lazyBytes, headers) } }