Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Content-Length, chunked transfer encoding and Host header improvements #2563

Merged
merged 5 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/dsl/body.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ To create an `Body` that encodes a Stream you can use `Body.fromStream`.
- Using a Stream of Bytes

```scala mdoc:silent
val streamHttpData1: Body = Body.fromStream(ZStream.fromChunk(Chunk.fromArray("Some String".getBytes(Charsets.Http))))
val streamHttpData1: Body = Body.fromStreamChunked(ZStream.fromChunk(Chunk.fromArray("Some String".getBytes(Charsets.Http))))
```

- Using a Stream of String

```scala mdoc:silent
val streamHttpData2: Body = Body.fromCharSequenceStream(ZStream("a", "b", "c"), Charsets.Http)
val streamHttpData2: Body = Body.fromCharSequenceStreamChunked(ZStream("a", "b", "c"), Charsets.Http)
```

### Creating a Body from a `File`

To create an `Body` that encodes a File you can use `Body.fromFile`:

```scala mdoc:silent:crash
val fileHttpData: Body = Body.fromFile(new java.io.File(getClass.getResource("/fileName.txt").getPath))
val fileHttpData: ZIO[Any, Nothing, Body] = Body.fromFile(new java.io.File(getClass.getResource("/fileName.txt").getPath))
```
4 changes: 1 addition & 3 deletions docs/dsl/headers.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ object SimpleResponseDispatcher extends ZIOAppDefault {
if (acceptsStreaming)
Response(
status = Status.Ok,
// Setting response header
headers = Headers(Header.ContentLength(message.length.toLong)), // adding CONTENT-LENGTH header
body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
)
else {
// Adding a custom header to Response
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/advanced/streaming-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object FileStreaming extends ZIOAppDefault {

// Read the file as ZStream
// Uses the blocking version of ZStream.fromFile
Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))),
Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))),

// Uses netty's capability to write file content to the Channel
// Content-type response headers are automatically identified and added
Expand Down
3 changes: 1 addition & 2 deletions docs/examples/advanced/streaming-response.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ object StreamingResponse extends ZIOAppDefault {
handler(
http.Response(
status = Status.Ok,
headers = Headers(Header.ContentLength(message.length.toLong)),
body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
),
),
).toHttpApp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[cli] object Retriever {

override def retrieve(): Task[FormField] =
for {
chunk <- Body.fromFile(new java.io.File(path.toUri())).asChunk
chunk <- Body.fromFile(new java.io.File(path.toUri())).flatMap(_.asChunk)
} yield FormField.binaryField(name, chunk, media)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object FileStreaming extends ZIOAppDefault {

// Read the file as ZStream
// Uses the blocking version of ZStream.fromFile
Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))),
Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))),

// Uses netty's capability to write file content to the Channel
// Content-type response headers are automatically identified and added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object RequestStreaming extends ZIOAppDefault {

// Creating HttpData from the stream
// This works for file of any size
val data = Body.fromStream(stream)
val data = Body.fromStreamChunked(stream)

Response(body = data)
}).toHttpApp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ object StreamingResponse extends ZIOAppDefault {
handler(
http.Response(
status = Status.Ok,
headers = Headers(Header.ContentLength(message.length.toLong)),
body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
),
),
).toHttpApp
Expand Down
63 changes: 51 additions & 12 deletions zio-http/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package zio.http

import java.io.FileInputStream
import java.io.{FileInputStream, IOException}
import java.nio.charset._
import java.nio.file._

Expand Down Expand Up @@ -120,6 +120,11 @@ trait Body { self =>
*/
def isComplete: Boolean

/**
* Returns whether or not the content length is known
*/
def knownContentLength: Option[Long]

/**
* Returns whether or not the body is known to be empty. Note that some bodies
* may not be known to be empty until an attempt is made to consume them.
Expand Down Expand Up @@ -167,8 +172,10 @@ object Body {
/**
* Constructs a [[zio.http.Body]] from the contents of a file.
*/
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4): Body =
FileBody(file, chunkSize)
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4)(implicit trace: Trace): ZIO[Any, Nothing, Body] =
ZIO.succeed(file.length()).map { fileSize =>
FileBody(file, chunkSize, fileSize)
}

/**
* Constructs a [[zio.http.Body]] from from form data, using multipart
Expand All @@ -180,7 +187,7 @@ object Body {
)(implicit trace: Trace): Body = {
val bytes = form.multipartBytes(specificBoundary)

StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
}

/**
Expand All @@ -192,26 +199,48 @@ object Body {
form: Form,
)(implicit trace: Trace): UIO[Body] =
form.multipartBytesUUID.map { case (boundary, bytes) =>
StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(boundary))
}

/**
* Constructs a [[zio.http.Body]] from a stream of bytes.
* Constructs a [[zio.http.Body]] from a stream of bytes with a known length.
*/
def fromStream(stream: ZStream[Any, Throwable, Byte], contentLength: Long): Body =
StreamBody(stream, knownContentLength = Some(contentLength))

/**
* Constructs a [[zio.http.Body]] from a stream of bytes of unknown length,
* using chunked transfer encoding.
*/
def fromStream(stream: ZStream[Any, Throwable, Byte]): Body =
StreamBody(stream)
def fromStreamChunked(stream: ZStream[Any, Throwable, Byte]): Body =
StreamBody(stream, knownContentLength = None)

/**
* Constructs a [[zio.http.Body]] from a stream of text, using the specified
* character set, which defaults to the HTTP character set.
* Constructs a [[zio.http.Body]] from a stream of text with known length,
* using the specified character set, which defaults to the HTTP character
* set.
*/
def fromCharSequenceStream(
stream: ZStream[Any, Throwable, CharSequence],
contentLength: Long,
charset: Charset = Charsets.Http,
)(implicit
trace: Trace,
): Body =
fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)
fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks, contentLength)

/**
* Constructs a [[zio.http.Body]] from a stream of text with unknown length
* using chunked transfer encoding, using the specified character set, which
* defaults to the HTTP character set.
*/
def fromCharSequenceStreamChunked(
stream: ZStream[Any, Throwable, CharSequence],
charset: Charset = Charsets.Http,
)(implicit
trace: Trace,
): Body =
fromStreamChunked(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)

/**
* Helper to create Body from String
Expand Down Expand Up @@ -262,6 +291,8 @@ object Body {
override def contentType(newMediaType: MediaType): Body = EmptyBody

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = EmptyBody

override def knownContentLength: Option[Long] = Some(0L)
}

private[zio] final case class ChunkBody(
Expand Down Expand Up @@ -291,11 +322,14 @@ object Body {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(data.length.toLong)
}

private[zio] final case class FileBody(
val file: java.io.File,
file: java.io.File,
chunkSize: Int = 1024 * 4,
fileSize: Long,
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body
Expand Down Expand Up @@ -339,10 +373,13 @@ object Body {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(fileSize)
}

private[zio] final case class StreamBody(
stream: ZStream[Any, Throwable, Byte],
knownContentLength: Option[Long],
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body {
Expand Down Expand Up @@ -385,6 +422,8 @@ object Body {

def contentType(newMediaType: zio.http.MediaType, newBoundary: zio.http.Boundary): zio.http.Body = this

override def knownContentLength: Option[Long] = Some(0L)

}

private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])(Trace.empty)
Expand Down
64 changes: 45 additions & 19 deletions zio-http/src/main/scala/zio/http/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -832,17 +832,18 @@ object Handler {
ZIO.fail(new AccessDeniedException(file.getAbsolutePath))
} else {
if (file.isFile) {
val length = Headers(Header.ContentLength(file.length()))
val response = http.Response(headers = length, body = Body.fromFile(file))
val pathName = file.toPath.toString

// Set MIME type in the response headers. This is only relevant in
// case of RandomAccessFile transfers as browsers use the MIME type,
// not the file extension, to determine how to process a URL.
// {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
determineMediaType(pathName) match {
case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
case None => ZIO.succeed(response)
Body.fromFile(file).flatMap { body =>
val response = http.Response(body = body)
val pathName = file.toPath.toString

// Set MIME type in the response headers. This is only relevant in
// case of RandomAccessFile transfers as browsers use the MIME type,
// not the file extension, to determine how to process a URL.
// {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
determineMediaType(pathName) match {
case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
case None => ZIO.succeed(response)
}
}
} else {
ZIO.fail(new NotDirectoryException(s"Found directory instead of a file."))
Expand Down Expand Up @@ -897,11 +898,10 @@ object Handler {
.acquireReleaseWith(openZip)(closeZip)
.mapZIO(jar => ZIO.attemptBlocking(jar.getEntry(resourcePath) -> jar))
.flatMap { case (entry, jar) => ZStream.fromInputStream(jar.getInputStream(entry)) }
response = Response(body = Body.fromStream(inZStream))
response = Response(body = Body.fromStream(inZStream, contentLength))
} yield mediaType.fold(response) { t =>
response
.addHeader(Header.ContentType(t))
.addHeader(Header.ContentLength(contentLength))
}
}
}
Expand All @@ -913,27 +913,53 @@ object Handler {

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body
* provided ZStream with a known content length as the body
*/
def fromStream[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
def fromStream[R](stream: ZStream[R, Throwable, String], contentLength: Long, charset: Charset = Charsets.Http)(
implicit trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), contentLength, charset))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream with a known content length as the body
*/
def fromStream[R](stream: ZStream[R, Throwable, Byte], contentLength: Long)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromStream(stream.provideEnvironment(env), contentLength))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body using chunked transfer encoding
*/
def fromStreamChunked[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), charset))
fromBody(Body.fromCharSequenceStreamChunked(stream.provideEnvironment(env), charset))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body
* provided ZStream as the body using chunked transfer encoding
*/
def fromStream[R](stream: ZStream[R, Throwable, Byte])(implicit
def fromStreamChunked[R](stream: ZStream[R, Throwable, Byte])(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromStream(stream.provideEnvironment(env)))
fromBody(Body.fromStreamChunked(stream.provideEnvironment(env)))
}
}.flatten

Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zio/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object Response {
* \- stream of data to be sent as Server Sent Events
*/
def fromServerSentEvents(data: ZStream[Any, Nothing, ServerSentEvent])(implicit trace: Trace): Response =
Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStream(data.map(_.encode)))
Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStreamChunked(data.map(_.encode)))

/**
* Creates a new response for the provided socket app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ private[http] object BodyCodec {
ZIO.succeed((body.asStream >>> ZPipeline.decodeCharsWith(Charset.defaultCharset()) >>> codec.streamDecoder).orDie)

def encodeToBody(value: ZStream[Any, Nothing, E], codec: BinaryCodec[E])(implicit trace: Trace): Body =
Body.fromStream(value >>> codec.streamEncoder)
Body.fromStreamChunked(value >>> codec.streamEncoder)

def encodeToBody(value: ZStream[Any, Nothing, E], codec: Codec[String, Char, E])(implicit trace: Trace): Body =
Body.fromStream(value >>> codec.streamEncoder.map(_.toByte))
Body.fromStreamChunked(value >>> codec.streamEncoder.map(_.toByte))

type Element = E
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,15 @@ private[codec] object EncoderDecoder {
} else None
private def encodeBody(inputs: Array[Any], contentType: => Header.ContentType): Body = {
if (isByteStream) {
Body.fromStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
Body.fromStreamChunked(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
} else {
if (inputs.length > 1) {
Body.fromMultipartForm(encodeMultipartFormData(inputs), formBoundary)
} else {
if (isEventStream) {
Body.fromCharSequenceStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode))
Body.fromCharSequenceStreamChunked(
inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode),
)
} else if (inputs.length < 1) {
Body.empty
} else {
Expand Down
Loading
Loading