diff --git a/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala b/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala index f50ed09649..d3d30b9e97 100644 --- a/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala +++ b/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala @@ -50,7 +50,7 @@ private[cli] object Retriever { override def retrieve(): Task[FormField] = for { - chunk <- Body.fromFile(new java.io.File(path.toUri())).asChunk + chunk <- Body.fromFile(new java.io.File(path.toUri())).flatMap(_.asChunk) } yield FormField.binaryField(name, chunk, media) } diff --git a/zio-http-example/src/main/scala/example/FileStreaming.scala b/zio-http-example/src/main/scala/example/FileStreaming.scala index 5bfc676269..dbfd6b9f93 100644 --- a/zio-http-example/src/main/scala/example/FileStreaming.scala +++ b/zio-http-example/src/main/scala/example/FileStreaming.scala @@ -17,7 +17,7 @@ object FileStreaming extends ZIOAppDefault { // Read the file as ZStream // Uses the blocking version of ZStream.fromFile - Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))), + Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))), // Uses netty's capability to write file content to the Channel // Content-type response headers are automatically identified and added diff --git a/zio-http-example/src/main/scala/example/RequestStreaming.scala b/zio-http-example/src/main/scala/example/RequestStreaming.scala index 5535bcb4a4..bd8166a29c 100644 --- a/zio-http-example/src/main/scala/example/RequestStreaming.scala +++ b/zio-http-example/src/main/scala/example/RequestStreaming.scala @@ -14,7 +14,7 @@ object RequestStreaming extends ZIOAppDefault { // Creating HttpData from the stream // This works for file of any size - val data = Body.fromStream(stream) + val data = Body.fromStreamChunked(stream) Response(body = data) }).toHttpApp diff --git a/zio-http-example/src/main/scala/example/StreamingResponse.scala b/zio-http-example/src/main/scala/example/StreamingResponse.scala index 2af65b16b9..e884b06a6f 100644 --- a/zio-http-example/src/main/scala/example/StreamingResponse.scala +++ b/zio-http-example/src/main/scala/example/StreamingResponse.scala @@ -25,8 +25,7 @@ object StreamingResponse extends ZIOAppDefault { handler( http.Response( status = Status.Ok, - headers = Headers(Header.ContentLength(message.length.toLong)), - body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream + body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream ), ), ).toHttpApp diff --git a/zio-http/src/main/scala/zio/http/Body.scala b/zio-http/src/main/scala/zio/http/Body.scala index cf64927a2a..ce5e1b6281 100644 --- a/zio-http/src/main/scala/zio/http/Body.scala +++ b/zio-http/src/main/scala/zio/http/Body.scala @@ -16,7 +16,7 @@ package zio.http -import java.io.FileInputStream +import java.io.{FileInputStream, IOException} import java.nio.charset._ import java.nio.file._ @@ -120,6 +120,11 @@ trait Body { self => */ def isComplete: Boolean + /** + * Returns whether or not the content length is known + */ + def knownContentLength: Option[Long] + /** * Returns whether or not the body is known to be empty. Note that some bodies * may not be known to be empty until an attempt is made to consume them. @@ -167,8 +172,10 @@ object Body { /** * Constructs a [[zio.http.Body]] from the contents of a file. */ - def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4): Body = - FileBody(file, chunkSize) + def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4)(implicit trace: Trace): ZIO[Any, Nothing, Body] = + ZIO.succeed(file.length()).map { fileSize => + FileBody(file, chunkSize, fileSize) + } /** * Constructs a [[zio.http.Body]] from from form data, using multipart @@ -180,7 +187,7 @@ object Body { )(implicit trace: Trace): Body = { val bytes = form.multipartBytes(specificBoundary) - StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(specificBoundary)) + StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(specificBoundary)) } /** @@ -192,26 +199,48 @@ object Body { form: Form, )(implicit trace: Trace): UIO[Body] = form.multipartBytesUUID.map { case (boundary, bytes) => - StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary)) + StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(boundary)) } /** - * Constructs a [[zio.http.Body]] from a stream of bytes. + * Constructs a [[zio.http.Body]] from a stream of bytes with a known length. + */ + def fromStream(stream: ZStream[Any, Throwable, Byte], contentLength: Long): Body = + StreamBody(stream, knownContentLength = Some(contentLength)) + + /** + * Constructs a [[zio.http.Body]] from a stream of bytes of unknown length, + * using chunked transfer encoding. */ - def fromStream(stream: ZStream[Any, Throwable, Byte]): Body = - StreamBody(stream) + def fromStreamChunked(stream: ZStream[Any, Throwable, Byte]): Body = + StreamBody(stream, knownContentLength = None) /** - * Constructs a [[zio.http.Body]] from a stream of text, using the specified - * character set, which defaults to the HTTP character set. + * Constructs a [[zio.http.Body]] from a stream of text with known length, + * using the specified character set, which defaults to the HTTP character + * set. */ def fromCharSequenceStream( stream: ZStream[Any, Throwable, CharSequence], + contentLength: Long, charset: Charset = Charsets.Http, )(implicit trace: Trace, ): Body = - fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks) + fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks, contentLength) + + /** + * Constructs a [[zio.http.Body]] from a stream of text with unknown length + * using chunked transfer encoding, using the specified character set, which + * defaults to the HTTP character set. + */ + def fromCharSequenceStreamChunked( + stream: ZStream[Any, Throwable, CharSequence], + charset: Charset = Charsets.Http, + )(implicit + trace: Trace, + ): Body = + fromStreamChunked(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks) /** * Helper to create Body from String @@ -262,6 +291,8 @@ object Body { override def contentType(newMediaType: MediaType): Body = EmptyBody override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = EmptyBody + + override def knownContentLength: Option[Long] = Some(0L) } private[zio] final case class ChunkBody( @@ -291,11 +322,14 @@ object Body { override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary))) + + override def knownContentLength: Option[Long] = Some(data.length.toLong) } private[zio] final case class FileBody( - val file: java.io.File, + file: java.io.File, chunkSize: Int = 1024 * 4, + fileSize: Long, override val mediaType: Option[MediaType] = None, override val boundary: Option[Boundary] = None, ) extends Body @@ -339,10 +373,13 @@ object Body { override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary))) + + override def knownContentLength: Option[Long] = Some(fileSize) } private[zio] final case class StreamBody( stream: ZStream[Any, Throwable, Byte], + knownContentLength: Option[Long], override val mediaType: Option[MediaType] = None, override val boundary: Option[Boundary] = None, ) extends Body { @@ -385,6 +422,8 @@ object Body { def contentType(newMediaType: zio.http.MediaType, newBoundary: zio.http.Boundary): zio.http.Body = this + override def knownContentLength: Option[Long] = Some(0L) + } private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])(Trace.empty) diff --git a/zio-http/src/main/scala/zio/http/Handler.scala b/zio-http/src/main/scala/zio/http/Handler.scala index 851a8be2af..8556e5d1fb 100644 --- a/zio-http/src/main/scala/zio/http/Handler.scala +++ b/zio-http/src/main/scala/zio/http/Handler.scala @@ -832,17 +832,18 @@ object Handler { ZIO.fail(new AccessDeniedException(file.getAbsolutePath)) } else { if (file.isFile) { - val length = Headers(Header.ContentLength(file.length())) - val response = http.Response(headers = length, body = Body.fromFile(file)) - val pathName = file.toPath.toString - - // Set MIME type in the response headers. This is only relevant in - // case of RandomAccessFile transfers as browsers use the MIME type, - // not the file extension, to determine how to process a URL. - // {{{https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type}}} - determineMediaType(pathName) match { - case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType))) - case None => ZIO.succeed(response) + Body.fromFile(file).flatMap { body => + val response = http.Response(body = body) + val pathName = file.toPath.toString + + // Set MIME type in the response headers. This is only relevant in + // case of RandomAccessFile transfers as browsers use the MIME type, + // not the file extension, to determine how to process a URL. + // {{{https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type}}} + determineMediaType(pathName) match { + case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType))) + case None => ZIO.succeed(response) + } } } else { ZIO.fail(new NotDirectoryException(s"Found directory instead of a file.")) @@ -897,11 +898,10 @@ object Handler { .acquireReleaseWith(openZip)(closeZip) .mapZIO(jar => ZIO.attemptBlocking(jar.getEntry(resourcePath) -> jar)) .flatMap { case (entry, jar) => ZStream.fromInputStream(jar.getInputStream(entry)) } - response = Response(body = Body.fromStream(inZStream)) + response = Response(body = Body.fromStream(inZStream, contentLength)) } yield mediaType.fold(response) { t => response .addHeader(Header.ContentType(t)) - .addHeader(Header.ContentLength(contentLength)) } } } @@ -913,27 +913,53 @@ object Handler { /** * Creates a Handler that always succeeds with a 200 status code and the - * provided ZStream as the body + * provided ZStream with a known content length as the body */ - def fromStream[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit + def fromStream[R](stream: ZStream[R, Throwable, String], contentLength: Long, charset: Charset = Charsets.Http)( + implicit trace: Trace, + ): Handler[R, Throwable, Any, Response] = + Handler.fromZIO { + ZIO.environment[R].map { env => + fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), contentLength, charset)) + } + }.flatten + + /** + * Creates a Handler that always succeeds with a 200 status code and the + * provided ZStream with a known content length as the body + */ + def fromStream[R](stream: ZStream[R, Throwable, Byte], contentLength: Long)(implicit + trace: Trace, + ): Handler[R, Throwable, Any, Response] = + Handler.fromZIO { + ZIO.environment[R].map { env => + fromBody(Body.fromStream(stream.provideEnvironment(env), contentLength)) + } + }.flatten + + /** + * Creates a Handler that always succeeds with a 200 status code and the + * provided ZStream as the body using chunked transfer encoding + */ + def fromStreamChunked[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit trace: Trace, ): Handler[R, Throwable, Any, Response] = Handler.fromZIO { ZIO.environment[R].map { env => - fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), charset)) + fromBody(Body.fromCharSequenceStreamChunked(stream.provideEnvironment(env), charset)) } }.flatten /** * Creates a Handler that always succeeds with a 200 status code and the - * provided ZStream as the body + * provided ZStream as the body using chunked transfer encoding */ - def fromStream[R](stream: ZStream[R, Throwable, Byte])(implicit + def fromStreamChunked[R](stream: ZStream[R, Throwable, Byte])(implicit trace: Trace, ): Handler[R, Throwable, Any, Response] = Handler.fromZIO { ZIO.environment[R].map { env => - fromBody(Body.fromStream(stream.provideEnvironment(env))) + fromBody(Body.fromStreamChunked(stream.provideEnvironment(env))) } }.flatten diff --git a/zio-http/src/main/scala/zio/http/Response.scala b/zio-http/src/main/scala/zio/http/Response.scala index bba283a2f8..0cb0b52e69 100644 --- a/zio-http/src/main/scala/zio/http/Response.scala +++ b/zio-http/src/main/scala/zio/http/Response.scala @@ -181,7 +181,7 @@ object Response { * \- stream of data to be sent as Server Sent Events */ def fromServerSentEvents(data: ZStream[Any, Nothing, ServerSentEvent])(implicit trace: Trace): Response = - Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStream(data.map(_.encode))) + Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStreamChunked(data.map(_.encode))) /** * Creates a new response for the provided socket app diff --git a/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala b/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala index a03465f2f3..4d09af86f2 100644 --- a/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala +++ b/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala @@ -143,10 +143,10 @@ private[http] object BodyCodec { ZIO.succeed((body.asStream >>> ZPipeline.decodeCharsWith(Charset.defaultCharset()) >>> codec.streamDecoder).orDie) def encodeToBody(value: ZStream[Any, Nothing, E], codec: BinaryCodec[E])(implicit trace: Trace): Body = - Body.fromStream(value >>> codec.streamEncoder) + Body.fromStreamChunked(value >>> codec.streamEncoder) def encodeToBody(value: ZStream[Any, Nothing, E], codec: Codec[String, Char, E])(implicit trace: Trace): Body = - Body.fromStream(value >>> codec.streamEncoder.map(_.toByte)) + Body.fromStreamChunked(value >>> codec.streamEncoder.map(_.toByte)) type Element = E } diff --git a/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala b/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala index 6fee2f52d4..bb7afab795 100644 --- a/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala +++ b/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala @@ -530,13 +530,15 @@ private[codec] object EncoderDecoder { } else None private def encodeBody(inputs: Array[Any], contentType: => Header.ContentType): Body = { if (isByteStream) { - Body.fromStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]]) + Body.fromStreamChunked(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]]) } else { if (inputs.length > 1) { Body.fromMultipartForm(encodeMultipartFormData(inputs), formBoundary) } else { if (isEventStream) { - Body.fromCharSequenceStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode)) + Body.fromCharSequenceStreamChunked( + inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode), + ) } else if (inputs.length < 1) { Body.empty } else { diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala index cac6af0a5c..5cd83ac9d4 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala @@ -39,8 +39,14 @@ object NettyBody extends BodyEncoding { private[zio] def fromAsync( unsafeAsync: UnsafeAsync => Unit, + knownContentLength: Option[Long], contentTypeHeader: Option[Header.ContentType] = None, - ): Body = AsyncBody(unsafeAsync, contentTypeHeader.map(_.mediaType), contentTypeHeader.flatMap(_.boundary)) + ): Body = AsyncBody( + unsafeAsync, + knownContentLength, + contentTypeHeader.map(_.mediaType), + contentTypeHeader.flatMap(_.boundary), + ) /** * Helper to create Body from ByteBuf @@ -79,6 +85,8 @@ object NettyBody extends BodyEncoding { override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary))) + + override def knownContentLength: Option[Long] = Some(asciiString.length().toLong) } private[zio] final case class ByteBufBody( @@ -109,10 +117,13 @@ object NettyBody extends BodyEncoding { override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary))) + + override def knownContentLength: Option[Long] = Some(byteBuf.readableBytes().toLong) } private[zio] final case class AsyncBody( unsafeAsync: UnsafeAsync => Unit, + knownContentLength: Option[Long], override val mediaType: Option[MediaType] = None, override val boundary: Option[Boundary] = None, ) extends Body diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala index 1fd1b23501..fb91aaec2a 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala @@ -45,55 +45,60 @@ object NettyBodyWriter { // Write the end marker. ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) None - case AsyncBody(async, _, _) => - async( - new UnsafeAsync { - override def apply(message: Chunk[Byte], isLast: Boolean): Unit = { - val nettyMsg = message match { - case b: ByteArray => Unpooled.wrappedBuffer(b.array) - case other => Unpooled.wrappedBuffer(other.toArray) - } - ctx.writeAndFlush(nettyMsg) - if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) - } + case AsyncBody(async, _, _, _) => + contentLength.orElse(body.knownContentLength) match { + case Some(_) => + async( + new UnsafeAsync { + override def apply(message: Chunk[Byte], isLast: Boolean): Unit = { + val nettyMsg = message match { + case b: ByteArray => Unpooled.wrappedBuffer(b.array) + case other => Unpooled.wrappedBuffer(other.toArray) + } + ctx.writeAndFlush(nettyMsg) + } - override def fail(cause: Throwable): Unit = - ctx.fireExceptionCaught(cause) - }, - ) - None + override def fail(cause: Throwable): Unit = + ctx.fireExceptionCaught(cause) + }, + ) + None + case None => + async( + new UnsafeAsync { + override def apply(message: Chunk[Byte], isLast: Boolean): Unit = { + val nettyMsg = message match { + case b: ByteArray => Unpooled.wrappedBuffer(b.array) + case other => Unpooled.wrappedBuffer(other.toArray) + } + ctx.writeAndFlush(new DefaultHttpContent(nettyMsg)) + if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + } + + override def fail(cause: Throwable): Unit = + ctx.fireExceptionCaught(cause) + }, + ) + None + } case AsciiStringBody(asciiString, _, _) => - ctx.write(Unpooled.wrappedBuffer(asciiString.array())) - ctx.flush() + ctx.writeAndFlush(Unpooled.wrappedBuffer(asciiString.array())) None - case StreamBody(stream, _, _) => + case StreamBody(stream, _, _, _) => Some( - contentLength match { + contentLength.orElse(body.knownContentLength) match { case Some(length) => stream.chunks .runFoldZIO(length) { (remaining, bytes) => - remaining - bytes.size match { - case 0L => - NettyFutureExecutor.executed { - // Flushes the last body content and LastHttpContent together to avoid race conditions. - ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray))) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) - }.as(0L) - - case n => - NettyFutureExecutor.executed { - ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray))) - }.as(n) - } + NettyFutureExecutor.executed { + ctx.writeAndFlush(Unpooled.wrappedBuffer(bytes.toArray)) + }.as(remaining - bytes.size) } .flatMap { case 0L => ZIO.unit case remaining => val actualLength = length - remaining - ZIO.logWarning(s"Expected Content-Length of $length, but sent $actualLength bytes") *> - NettyFutureExecutor.executed { - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) - } + ZIO.logWarning(s"Expected Content-Length of $length, but sent $actualLength bytes") } case None => @@ -109,8 +114,7 @@ object NettyBodyWriter { }, ) case ChunkBody(data, _, _) => - ctx.write(Unpooled.wrappedBuffer(data.toArray)) - ctx.flush() + ctx.writeAndFlush(Unpooled.wrappedBuffer(data.toArray)) None case EmptyBody => ctx.flush() diff --git a/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala b/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala index 948382d495..c5d70aa1f2 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala @@ -48,10 +48,11 @@ object NettyResponse { unsafe: Unsafe, trace: Trace, ): ZIO[Any, Nothing, Response] = { - val status = Conversions.statusFromNetty(jRes.status()) - val headers = Conversions.headersFromNetty(jRes.headers()) + val status = Conversions.statusFromNetty(jRes.status()) + val headers = Conversions.headersFromNetty(jRes.headers()) + val knownContentLength = headers.get(Header.ContentLength).map(_.length) - if (headers.get(Header.ContentLength).map(_.length).contains(0L)) { + if (knownContentLength.contains(0L)) { onComplete .succeed(ChannelState.forStatus(status)) .as( @@ -67,9 +68,7 @@ object NettyResponse { responseHandler, ): Unit - val data = NettyBody.fromAsync { callback => - responseHandler.connect(callback) - } + val data = NettyBody.fromAsync(callback => responseHandler.connect(callback), knownContentLength) ZIO.succeed(Response(status, headers, data)) } } diff --git a/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala b/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala index 72d14d89ec..fc1466fe7f 100644 --- a/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala +++ b/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala @@ -40,16 +40,17 @@ private[zio] object NettyRequestEncoder { // Host and port information should be in the headers. val path = replaceEmptyPathWithSlash(req.url).relative.addLeadingSlash.encode - val encodedReqHeaders = Conversions.headersToNetty(req.allHeaders) + val headers = Conversions.headersToNetty(req.allHeaders) - val headers = req.url.hostPort match { - case Some(host) => encodedReqHeaders.set(HttpHeaderNames.HOST, host) - case _ => encodedReqHeaders + req.url.hostPort match { + case Some(host) if !headers.contains(HttpHeaderNames.HOST) => + headers.set(HttpHeaderNames.HOST, host) + case _ => } if (req.body.isComplete) { - req.body.asChunk.map { chunk => - val content = Unpooled.wrappedBuffer(chunk.toArray) + req.body.asArray.map { array => + val content = Unpooled.wrappedBuffer(array) val writerIndex = content.writerIndex() headers.set(HttpHeaderNames.CONTENT_LENGTH, writerIndex.toString) @@ -60,7 +61,12 @@ private[zio] object NettyRequestEncoder { } } else { ZIO.attempt { - headers.set(HttpHeaderNames.TRANSFER_ENCODING, "chunked") + req.body.knownContentLength match { + case Some(length) => + headers.set(HttpHeaderNames.CONTENT_LENGTH, length.toString) + case None => + headers.set(HttpHeaderNames.TRANSFER_ENCODING, "chunked") + } new DefaultHttpRequest(jVersion, method, path, headers) } } diff --git a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala index f44c549d6a..10e33420cb 100644 --- a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala @@ -224,13 +224,9 @@ private[zio] final case class ServerInboundHandler( remoteAddress = remoteAddress, ) case nettyReq: HttpRequest => - val handler = addAsyncBodyHandler(ctx) - val body = NettyBody.fromAsync( - { async => - handler.connect(async) - }, - contentType, - ) + val knownContentLength = headers.get(Header.ContentLength).map(_.length) + val handler = addAsyncBodyHandler(ctx) + val body = NettyBody.fromAsync(async => handler.connect(async), knownContentLength, contentType) Request( body = body, diff --git a/zio-http/src/test/scala/zio/http/BodySpec.scala b/zio-http/src/test/scala/zio/http/BodySpec.scala index 88af339189..8313dd1324 100644 --- a/zio-http/src/test/scala/zio/http/BodySpec.scala +++ b/zio-http/src/test/scala/zio/http/BodySpec.scala @@ -37,7 +37,7 @@ object BodySpec extends ZIOHttpSpec { check(Gen.string) { payload => val stringBuffer = payload.getBytes(Charsets.Http) val responseContent = ZStream.fromIterable(stringBuffer, chunkSize = 2) - val res = Body.fromStream(responseContent).asString(Charsets.Http) + val res = Body.fromStreamChunked(responseContent).asString(Charsets.Http) assertZIO(res)(equalTo(payload)) } }, @@ -45,12 +45,12 @@ object BodySpec extends ZIOHttpSpec { suite("fromFile")( test("success") { lazy val file = testFile - val res = Body.fromFile(file).asString(Charsets.Http) + val res = Body.fromFile(file).flatMap(_.asString(Charsets.Http)) assertZIO(res)(equalTo("foo\nbar")) }, test("success small chunk") { lazy val file = testFile - val res = Body.fromFile(file, 3).asString(Charsets.Http) + val res = Body.fromFile(file, 3).flatMap(_.asString(Charsets.Http)) assertZIO(res)(equalTo("foo\nbar")) }, ), diff --git a/zio-http/src/test/scala/zio/http/ClientSpec.scala b/zio-http/src/test/scala/zio/http/ClientSpec.scala index 8463f74598..230baf8c46 100644 --- a/zio-http/src/test/scala/zio/http/ClientSpec.scala +++ b/zio-http/src/test/scala/zio/http/ClientSpec.scala @@ -64,7 +64,7 @@ object ClientSpec extends HttpRunnableSpec { val app = Handler.fromFunctionZIO[Request] { req => req.body.asString.map(Response.text(_)) }.sandbox.toHttpApp val stream = ZStream.fromIterable(List("a", "b", "c"), chunkSize = 1) val res = app - .deploy(Request(method = Method.POST, body = Body.fromCharSequenceStream(stream))) + .deploy(Request(method = Method.POST, body = Body.fromCharSequenceStreamChunked(stream))) .flatMap(_.body.asString) assertZIO(res)(equalTo("abc")) }, @@ -87,7 +87,7 @@ object ClientSpec extends HttpRunnableSpec { } yield assertTrue(loggedUrl == s"$baseURL/") }, test("reading of unfinished body must fail") { - val app = Handler.fromStream(ZStream.never).sandbox.toHttpApp + val app = Handler.fromStreamChunked(ZStream.never).sandbox.toHttpApp val requestCode = (client: Client) => (for { response <- ZIO.scoped(client(Request())) diff --git a/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala b/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala index a8b3b78004..7d76fc71d4 100644 --- a/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala +++ b/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala @@ -37,10 +37,12 @@ object ClientStreamingSpec extends HttpRunnableSpec { handler(Response.text("simple response")), Method.GET / "streaming-get" -> handler( - Response(body = Body.fromStream(ZStream.fromIterable("streaming response".getBytes).rechunk(3))), + Response(body = Body.fromStreamChunked(ZStream.fromIterable("streaming response".getBytes).rechunk(3))), ), Method.POST / "simple-post" -> handler((req: Request) => req.ignoreBody.as(Response.ok)), - Method.POST / "streaming-echo" -> handler((req: Request) => Response(body = Body.fromStream(req.body.asStream))), + Method.POST / "streaming-echo" -> handler((req: Request) => + Response(body = Body.fromStreamChunked(req.body.asStream)), + ), Method.POST / "form" -> handler((req: Request) => req.body.asMultipartFormStream.flatMap { form => form.collectAll.flatMap { inMemoryForm => @@ -95,7 +97,7 @@ object ClientStreamingSpec extends HttpRunnableSpec { .request( Request.post( URL.decode(s"http://localhost:$port/simple-post").toOption.get, - Body.fromStream( + Body.fromStreamChunked( (ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3)) .schedule(Schedule.fixed(10.millis)), ), @@ -111,7 +113,7 @@ object ClientStreamingSpec extends HttpRunnableSpec { .request( Request.post( URL.decode(s"http://localhost:$port/streaming-echo").toOption.get, - Body.fromStream( + Body.fromStreamChunked( ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3), ), ), @@ -221,7 +223,7 @@ object ClientStreamingSpec extends HttpRunnableSpec { Request .post( URL.decode(s"http://localhost:$port/form").toOption.get, - Body.fromStream(stream), + Body.fromStreamChunked(stream), ) .addHeaders(Headers(Header.ContentType(MediaType.multipart.`form-data`, Some(boundary)))), ) @@ -243,7 +245,7 @@ object ClientStreamingSpec extends HttpRunnableSpec { .request( Request.post( URL.decode(s"http://localhost:$port/simple-post").toOption.get, - Body.fromStream(ZStream.fail(new RuntimeException("Some error"))), + Body.fromStreamChunked(ZStream.fail(new RuntimeException("Some error"))), ), ) .exit @@ -262,7 +264,7 @@ object ClientStreamingSpec extends HttpRunnableSpec { .request( Request.post( URL.decode(s"http://localhost:$port/streaming-echo").toOption.get, - Body.fromStream( + Body.fromStreamChunked( (ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3)).chunks.tap { chunk => if (chunk == Chunk.fromArray("que".getBytes)) sync.await diff --git a/zio-http/src/test/scala/zio/http/HandlerSpec.scala b/zio-http/src/test/scala/zio/http/HandlerSpec.scala index 6a45ae0cf9..e5630f7ab7 100644 --- a/zio-http/src/test/scala/zio/http/HandlerSpec.scala +++ b/zio-http/src/test/scala/zio/http/HandlerSpec.scala @@ -387,11 +387,12 @@ object HandlerSpec extends ZIOHttpSpec with ExitAssertion { val tempFile = tempPath.toFile val http = Handler.fromFileZIO(ZIO.succeed(tempFile)) for { - r <- http.apply {} + r <- http.apply {} + tempFile <- Body.fromFile(tempFile) } yield { assert(r.status)(equalTo(Status.Ok)) && assert(r.headers)(contains(Header.ContentType(MediaType.image.`jpeg`))) && - assert(r.body)(equalTo(Body.fromFile(tempFile))) + assert(r.body)(equalTo(tempFile)) } } }, diff --git a/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala b/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala index 5868d7a381..43858644a3 100644 --- a/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala +++ b/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala @@ -41,7 +41,7 @@ object ResponseCompressionSpec extends ZIOHttpSpec { Headers( Header.ContentType(MediaType.text.plain), ), - Body.fromCharSequenceStream( + Body.fromCharSequenceStreamChunked( ZStream .unfold[Long, String](0L) { s => if (s < 1000) Some((s"$s\n", s + 1)) else None diff --git a/zio-http/src/test/scala/zio/http/ServerSpec.scala b/zio-http/src/test/scala/zio/http/ServerSpec.scala index c9dcefcd54..d46c52fe53 100644 --- a/zio-http/src/test/scala/zio/http/ServerSpec.scala +++ b/zio-http/src/test/scala/zio/http/ServerSpec.scala @@ -129,7 +129,7 @@ object ServerSpec extends HttpRunnableSpec { val app = Routes(RoutePattern.any -> handler((_: Path, req: Request) => Response(body = req.body))).toHttpApp val res = - app.deploy.body.mapZIO(_.asChunk.map(_.length)).run(body = Body.fromCharSequenceStream(dataStream)) + app.deploy.body.mapZIO(_.asChunk.map(_.length)).run(body = Body.fromCharSequenceStreamChunked(dataStream)) assertZIO(res)(equalTo(MaxSize)) } } + @@ -337,13 +337,13 @@ object ServerSpec extends HttpRunnableSpec { } }, test("text streaming") { - val res = Handler.fromStream(ZStream("a", "b", "c")).sandbox.toHttpApp.deploy.body.mapZIO(_.asString).run() + val res = Handler.fromStreamChunked(ZStream("a", "b", "c")).sandbox.toHttpApp.deploy.body.mapZIO(_.asString).run() assertZIO(res)(equalTo("abc")) }, test("echo streaming") { val res = Routes .singleton(handler { (_: Path, req: Request) => - Handler.fromStream(ZStream.fromZIO(req.body.asChunk).flattenChunks): Handler[ + Handler.fromStreamChunked(ZStream.fromZIO(req.body.asChunk).flattenChunks): Handler[ Any, Throwable, (Path, Request), @@ -361,7 +361,14 @@ object ServerSpec extends HttpRunnableSpec { test("file-streaming") { val path = getClass.getResource("/TestFile.txt").getPath val res = - Handler.fromStream(ZStream.fromPath(Paths.get(path))).sandbox.toHttpApp.deploy.body.mapZIO(_.asString).run() + Handler + .fromStreamChunked(ZStream.fromPath(Paths.get(path))) + .sandbox + .toHttpApp + .deploy + .body + .mapZIO(_.asString) + .run() assertZIO(res)(equalTo("foo\nbar")) } @@ TestAspect.os(os => !os.isWindows), suite("html")( @@ -426,7 +433,7 @@ object ServerSpec extends HttpRunnableSpec { test("POST Request stream") { val app: HttpApp[Any] = Routes.singleton { handler { (_: Path, req: Request) => - Response(body = Body.fromStream(req.body.asStream)) + Response(body = Body.fromStreamChunked(req.body.asStream)) } }.toHttpApp diff --git a/zio-http/src/test/scala/zio/http/internal/HttpGen.scala b/zio-http/src/test/scala/zio/http/internal/HttpGen.scala index 1ba2149702..cba1b1ca2d 100644 --- a/zio-http/src/test/scala/zio/http/internal/HttpGen.scala +++ b/zio-http/src/test/scala/zio/http/internal/HttpGen.scala @@ -63,7 +63,8 @@ object HttpGen { url <- HttpGen.url headers <- Gen.listOf(HttpGen.header).map(Headers(_)) version <- httpVersion - } yield Request(version, method, url, headers, Body.fromFile(file), None) + body <- Gen.fromZIO(Body.fromFile(file)) + } yield Request(version, method, url, headers, body, None) } def genAbsoluteLocation: Gen[Any, Location.Absolute] = for { @@ -97,7 +98,7 @@ object HttpGen { cnt <- Gen .fromIterable( List( - Body.fromStream( + Body.fromStreamChunked( ZStream.fromIterable(list, chunkSize = 2).map(b => Chunk.fromArray(b.getBytes())).flattenChunks, ), Body.fromString(list.mkString("")), @@ -134,7 +135,7 @@ object HttpGen { cnt <- Gen .fromIterable( List( - Body.fromStream( + Body.fromStreamChunked( ZStream.fromIterable(list, chunkSize = 2).map(b => Chunk.fromArray(b.getBytes())).flattenChunks, ), Body.fromString(list.mkString("")), diff --git a/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala b/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala index 478d51cc2d..6ce5760ca4 100644 --- a/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala +++ b/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala @@ -31,12 +31,12 @@ object NettyBodySpec extends ZIOHttpSpec { suite("fromAsync")( test("success") { val message = Chunk.fromArray("Hello World".getBytes(Charsets.Http)) - val chunk = NettyBody.fromAsync(async => async(message, isLast = true)).asChunk + val chunk = NettyBody.fromAsync(async => async(message, isLast = true), knownContentLength = None).asChunk assertZIO(chunk)(equalTo(message)) }, test("fail") { val exception = new RuntimeException("Some Error") - val error = NettyBody.fromAsync(_ => throw exception).asChunk.flip + val error = NettyBody.fromAsync(_ => throw exception, knownContentLength = None).asChunk.flip assertZIO(error)(equalTo(exception)) }, ), diff --git a/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala b/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala index 11a93802d2..eea6001ab5 100644 --- a/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala +++ b/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala @@ -19,10 +19,7 @@ object NettyStreamBodySpec extends HttpRunnableSpec { handler( http.Response( status = Status.Ok, - // content length header is important, - // in this case the server will not use chunked transfer encoding even if response is a stream - headers = Headers(Header.ContentLength(len)), - body = Body.fromStream(streams.next()), + body = Body.fromStream(streams.next(), len), ), ), ).sandbox.toHttpApp diff --git a/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala b/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala index 340d7674b8..0f003c9270 100644 --- a/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala +++ b/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala @@ -31,7 +31,7 @@ import zio.http.netty.NettyConfig object NettyConnectionPoolSpec extends HttpRunnableSpec { private val app = Routes( - Method.POST / "streaming" -> handler((req: Request) => Response(body = Body.fromStream(req.body.asStream))), + Method.POST / "streaming" -> handler((req: Request) => Response(body = Body.fromStreamChunked(req.body.asStream))), Method.GET / "slow" -> handler(ZIO.sleep(1.hour).as(Response.text("done"))), Method.ANY / trailing -> handler((_: Path, req: Request) => req.body.asString.map(Response.text(_))), ).sandbox.toHttpApp @@ -76,7 +76,7 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec { .deploy( Request( method = Method.POST, - body = Body.fromCharSequenceStream(stream), + body = Body.fromCharSequenceStreamChunked(stream), headers = extraHeaders, ), ) @@ -113,7 +113,7 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec { Request( method = Method.POST, url = URL.root / "streaming", - body = Body.fromCharSequenceStream(stream), + body = Body.fromCharSequenceStreamChunked(stream), headers = extraHeaders, ), )