From 339ad36634b68ab5b4d90fb9991e38d5020a1039 Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Fri, 15 May 2020 15:36:26 +0200 Subject: [PATCH] Add Brotli support New Functions: CompressHandlerBrotliLevel(h RequestHandler, brotliLevel, otherLevel int) RequestHandler Request.BodyUnbrotli() ([]byte, error) Response.BodyUnbrotli() ([]byte, error) AppendBrotliBytesLevel(dst, src []byte, level int) []byte WriteBrotliLevel(w io.Writer, p []byte, level int) (int, error) WriteBrotli(w io.Writer, p []byte) (int, error) AppendBrotliBytes(dst, src []byte) []byte WriteUnbrotli(w io.Writer, p []byte) (int, error) AppendUnbrotliBytes(dst, src []byte) ([]byte, error) New Constants: CompressBrotliNoCompression CompressBrotliBestSpeed CompressBrotliBestCompression CompressBrotliDefaultCompression Brotli compression levels are different from gzip/flate. Because of this we have separate level constants and CompressHandlerBrotliLevel takes 2 levels. I didn't add Brotli support to CompressHandler as this could cause a spike in CPU usage when users upgrade fasthttp. fasthttp.CompressBrotliDefaultCompression is not the same as brotli.DefaultCompression. brotli.DefaultCompression is more than twice as slow as fasthttp.CompressBrotliDefaultCompression which I thought was unreasonable as default. --- brotli.go | 193 +++++++++++++++++++++++++++++++++++++++++++++++++ brotli_test.go | 177 +++++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + http.go | 82 +++++++++++++++++++++ server.go | 33 ++++++++- strings.go | 1 + 7 files changed, 488 insertions(+), 1 deletion(-) create mode 100644 brotli.go create mode 100644 brotli_test.go diff --git a/brotli.go b/brotli.go new file mode 100644 index 0000000000..a88fdcec8c --- /dev/null +++ b/brotli.go @@ -0,0 +1,193 @@ +package fasthttp + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/andybalholm/brotli" + "github.com/valyala/bytebufferpool" + "github.com/valyala/fasthttp/stackless" +) + +// Supported compression levels. +const ( + CompressBrotliNoCompression = 0 + CompressBrotliBestSpeed = brotli.BestSpeed + CompressBrotliBestCompression = brotli.BestCompression + + // Choose a default brotli compression level comparable to + // CompressDefaultCompression (gzip 6) + // See: https://github.com/valyala/fasthttp/issues/798#issuecomment-626293806 + CompressBrotliDefaultCompression = 4 +) + +func acquireBrotliReader(r io.Reader) (*brotli.Reader, error) { + v := brotliReaderPool.Get() + if v == nil { + return brotli.NewReader(r), nil + } + zr := v.(*brotli.Reader) + if err := zr.Reset(r); err != nil { + return nil, err + } + return zr, nil +} + +func releaseBrotliReader(zr *brotli.Reader) { + brotliReaderPool.Put(zr) +} + +var brotliReaderPool sync.Pool + +func acquireStacklessBrotliWriter(w io.Writer, level int) stackless.Writer { + nLevel := normalizeBrotliCompressLevel(level) + p := stacklessBrotliWriterPoolMap[nLevel] + v := p.Get() + if v == nil { + return stackless.NewWriter(w, func(w io.Writer) stackless.Writer { + return acquireRealBrotliWriter(w, level) + }) + } + sw := v.(stackless.Writer) + sw.Reset(w) + return sw +} + +func releaseStacklessBrotliWriter(sw stackless.Writer, level int) { + sw.Close() + nLevel := normalizeBrotliCompressLevel(level) + p := stacklessBrotliWriterPoolMap[nLevel] + p.Put(sw) +} + +func acquireRealBrotliWriter(w io.Writer, level int) *brotli.Writer { + nLevel := normalizeBrotliCompressLevel(level) + p := realBrotliWriterPoolMap[nLevel] + v := p.Get() + if v == nil { + zw := brotli.NewWriterLevel(w, level) + return zw + } + zw := v.(*brotli.Writer) + zw.Reset(w) + return zw +} + +func releaseRealBrotliWriter(zw *brotli.Writer, level int) { + zw.Close() + nLevel := normalizeBrotliCompressLevel(level) + p := realBrotliWriterPoolMap[nLevel] + p.Put(zw) +} + +var ( + stacklessBrotliWriterPoolMap = newCompressWriterPoolMap() + realBrotliWriterPoolMap = newCompressWriterPoolMap() +) + +// AppendBrotliBytesLevel appends brotlied src to dst using the given +// compression level and returns the resulting dst. +// +// Supported compression levels are: +// +// * CompressBrotliNoCompression +// * CompressBrotliBestSpeed +// * CompressBrotliBestCompression +// * CompressBrotliDefaultCompression +func AppendBrotliBytesLevel(dst, src []byte, level int) []byte { + w := &byteSliceWriter{dst} + WriteBrotliLevel(w, src, level) //nolint:errcheck + return w.b +} + +// WriteBrotliLevel writes brotlied p to w using the given compression level +// and returns the number of compressed bytes written to w. +// +// Supported compression levels are: +// +// * CompressBrotliNoCompression +// * CompressBrotliBestSpeed +// * CompressBrotliBestCompression +// * CompressBrotliDefaultCompression +func WriteBrotliLevel(w io.Writer, p []byte, level int) (int, error) { + switch w.(type) { + case *byteSliceWriter, + *bytes.Buffer, + *bytebufferpool.ByteBuffer: + // These writers don't block, so we can just use stacklessWriteBrotli + ctx := &compressCtx{ + w: w, + p: p, + level: level, + } + stacklessWriteBrotli(ctx) + return len(p), nil + default: + zw := acquireStacklessBrotliWriter(w, level) + n, err := zw.Write(p) + releaseStacklessBrotliWriter(zw, level) + return n, err + } +} + +var stacklessWriteBrotli = stackless.NewFunc(nonblockingWriteBrotli) + +func nonblockingWriteBrotli(ctxv interface{}) { + ctx := ctxv.(*compressCtx) + zw := acquireRealBrotliWriter(ctx.w, ctx.level) + + _, err := zw.Write(ctx.p) + if err != nil { + panic(fmt.Sprintf("BUG: brotli.Writer.Write for len(p)=%d returned unexpected error: %s", len(ctx.p), err)) + } + + releaseRealBrotliWriter(zw, ctx.level) +} + +// WriteBrotli writes brotlied p to w and returns the number of compressed +// bytes written to w. +func WriteBrotli(w io.Writer, p []byte) (int, error) { + return WriteBrotliLevel(w, p, CompressBrotliDefaultCompression) +} + +// AppendBrotliBytes appends brotlied src to dst and returns the resulting dst. +func AppendBrotliBytes(dst, src []byte) []byte { + return AppendBrotliBytesLevel(dst, src, CompressBrotliDefaultCompression) +} + +// WriteUnbrotli writes unbrotlied p to w and returns the number of uncompressed +// bytes written to w. +func WriteUnbrotli(w io.Writer, p []byte) (int, error) { + r := &byteSliceReader{p} + zr, err := acquireBrotliReader(r) + if err != nil { + return 0, err + } + n, err := copyZeroAlloc(w, zr) + releaseBrotliReader(zr) + nn := int(n) + if int64(nn) != n { + return 0, fmt.Errorf("too much data unbrotlied: %d", n) + } + return nn, err +} + +// AppendUnbrotliBytes appends unbrotlied src to dst and returns the resulting dst. +func AppendUnbrotliBytes(dst, src []byte) ([]byte, error) { + w := &byteSliceWriter{dst} + _, err := WriteUnbrotli(w, src) + return w.b, err +} + +// normalizes compression level into [0..11], so it could be used as an index +// in *PoolMap. +func normalizeBrotliCompressLevel(level int) int { + // -2 is the lowest compression level - CompressHuffmanOnly + // 9 is the highest compression level - CompressBestCompression + if level < 0 || level > 11 { + level = CompressBrotliDefaultCompression + } + return level +} diff --git a/brotli_test.go b/brotli_test.go new file mode 100644 index 0000000000..1ac94dd01c --- /dev/null +++ b/brotli_test.go @@ -0,0 +1,177 @@ +package fasthttp + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "testing" +) + +func TestBrotliBytesSerial(t *testing.T) { + t.Parallel() + + if err := testBrotliBytes(); err != nil { + t.Fatal(err) + } +} + +func TestBrotliBytesConcurrent(t *testing.T) { + t.Parallel() + + if err := testConcurrent(10, testBrotliBytes); err != nil { + t.Fatal(err) + } +} + +func testBrotliBytes() error { + for _, s := range compressTestcases { + if err := testBrotliBytesSingleCase(s); err != nil { + return err + } + } + return nil +} + +func testBrotliBytesSingleCase(s string) error { + prefix := []byte("foobar") + brotlipedS := AppendBrotliBytes(prefix, []byte(s)) + if !bytes.Equal(brotlipedS[:len(prefix)], prefix) { + return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, brotlipedS[:len(prefix)], prefix) + } + + unbrotliedS, err := AppendUnbrotliBytes(prefix, brotlipedS[len(prefix):]) + if err != nil { + return fmt.Errorf("unexpected error when uncompressing %q: %s", s, err) + } + if !bytes.Equal(unbrotliedS[:len(prefix)], prefix) { + return fmt.Errorf("unexpected prefix when uncompressing %q: %q. Expecting %q", s, unbrotliedS[:len(prefix)], prefix) + } + unbrotliedS = unbrotliedS[len(prefix):] + if string(unbrotliedS) != s { + return fmt.Errorf("unexpected uncompressed string %q. Expecting %q", unbrotliedS, s) + } + return nil +} + +func TestBrotliCompressSerial(t *testing.T) { + t.Parallel() + + if err := testBrotliCompress(); err != nil { + t.Fatal(err) + } +} + +func TestBrotliCompressConcurrent(t *testing.T) { + t.Parallel() + + if err := testConcurrent(10, testBrotliCompress); err != nil { + t.Fatal(err) + } +} + +func testBrotliCompress() error { + for _, s := range compressTestcases { + if err := testBrotliCompressSingleCase(s); err != nil { + return err + } + } + return nil +} + +func testBrotliCompressSingleCase(s string) error { + var buf bytes.Buffer + zw := acquireStacklessBrotliWriter(&buf, CompressDefaultCompression) + if _, err := zw.Write([]byte(s)); err != nil { + return fmt.Errorf("unexpected error: %s. s=%q", err, s) + } + releaseStacklessBrotliWriter(zw, CompressDefaultCompression) + + zr, err := acquireBrotliReader(&buf) + if err != nil { + return fmt.Errorf("unexpected error: %s. s=%q", err, s) + } + body, err := ioutil.ReadAll(zr) + if err != nil { + return fmt.Errorf("unexpected error: %s. s=%q", err, s) + } + if string(body) != s { + return fmt.Errorf("unexpected string after decompression: %q. Expecting %q", body, s) + } + releaseBrotliReader(zr) + return nil +} + +func TestCompressHandlerBrotliLevel(t *testing.T) { + t.Parallel() + + expectedBody := string(createFixedBody(2e4)) + h := CompressHandlerBrotliLevel(func(ctx *RequestCtx) { + ctx.Write([]byte(expectedBody)) //nolint:errcheck + }, CompressBrotliDefaultCompression, CompressDefaultCompression) + + var ctx RequestCtx + var resp Response + + // verify uncompressed response + h(&ctx) + s := ctx.Response.String() + br := bufio.NewReader(bytes.NewBufferString(s)) + if err := resp.Read(br); err != nil { + t.Fatalf("unexpected error: %s", err) + } + ce := resp.Header.Peek(HeaderContentEncoding) + if string(ce) != "" { + t.Fatalf("unexpected Content-Encoding: %q. Expecting %q", ce, "") + } + body := resp.Body() + if string(body) != expectedBody { + t.Fatalf("unexpected body %q. Expecting %q", body, expectedBody) + } + + // verify gzip-compressed response + ctx.Request.Reset() + ctx.Response.Reset() + ctx.Request.Header.Set("Accept-Encoding", "gzip, deflate, sdhc") + + h(&ctx) + s = ctx.Response.String() + br = bufio.NewReader(bytes.NewBufferString(s)) + if err := resp.Read(br); err != nil { + t.Fatalf("unexpected error: %s", err) + } + ce = resp.Header.Peek(HeaderContentEncoding) + if string(ce) != "gzip" { + t.Fatalf("unexpected Content-Encoding: %q. Expecting %q", ce, "gzip") + } + body, err := resp.BodyGunzip() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if string(body) != expectedBody { + t.Fatalf("unexpected body %q. Expecting %q", body, expectedBody) + } + + // verify brotli-compressed response + ctx.Request.Reset() + ctx.Response.Reset() + ctx.Request.Header.Set("Accept-Encoding", "gzip, deflate, sdhc, br") + + h(&ctx) + s = ctx.Response.String() + br = bufio.NewReader(bytes.NewBufferString(s)) + if err := resp.Read(br); err != nil { + t.Fatalf("unexpected error: %s", err) + } + ce = resp.Header.Peek(HeaderContentEncoding) + if string(ce) != "br" { + t.Fatalf("unexpected Content-Encoding: %q. Expecting %q", ce, "br") + } + body, err = resp.BodyUnbrotli() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if string(body) != expectedBody { + t.Fatalf("unexpected body %q. Expecting %q", body, expectedBody) + } +} diff --git a/go.mod b/go.mod index ba38c27803..8ebfed6169 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/valyala/fasthttp go 1.11 require ( + github.com/andybalholm/brotli v1.0.0 github.com/klauspost/compress v1.10.4 github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a diff --git a/go.sum b/go.sum index 96298ae163..863089e774 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4= +github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/klauspost/compress v1.10.4 h1:jFzIFaf586tquEB5EhzQG0HwGNSlgAJpG53G6Ss11wc= github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/http.go b/http.go index 3869b0ac36..7cc551a87e 100644 --- a/http.go +++ b/http.go @@ -395,6 +395,33 @@ func gunzipData(p []byte) ([]byte, error) { return bb.B, nil } +// BodyUnbrotli returns un-gzipped body data. +// +// This method may be used if the request header contains +// 'Content-Encoding: gzip' for reading un-gzipped body. +// Use Body for reading gzipped request body. +func (req *Request) BodyUnbrotli() ([]byte, error) { + return unBrotliData(req.Body()) +} + +// BodyUnbrotli returns un-gzipped body data. +// +// This method may be used if the response header contains +// 'Content-Encoding: gzip' for reading un-gzipped body. +// Use Body for reading gzipped response body. +func (resp *Response) BodyUnbrotli() ([]byte, error) { + return unBrotliData(resp.Body()) +} + +func unBrotliData(p []byte) ([]byte, error) { + var bb bytebufferpool.ByteBuffer + _, err := WriteUnbrotli(&bb, p) + if err != nil { + return nil, err + } + return bb.B, nil +} + // BodyInflate returns inflated body data. // // This method may be used if the response header contains @@ -1273,6 +1300,61 @@ func (resp *Response) WriteDeflateLevel(w *bufio.Writer, level int) error { return resp.Write(w) } +func (resp *Response) brotliBody(level int) error { + if len(resp.Header.peek(strContentEncoding)) > 0 { + // It looks like the body is already compressed. + // Do not compress it again. + return nil + } + + if !resp.Header.isCompressibleContentType() { + // The content-type cannot be compressed. + return nil + } + + if resp.bodyStream != nil { + // Reset Content-Length to -1, since it is impossible + // to determine body size beforehand of streamed compression. + // For https://github.com/valyala/fasthttp/issues/176 . + resp.Header.SetContentLength(-1) + + // Do not care about memory allocations here, since brotli is slow + // and allocates a lot of memory by itself. + bs := resp.bodyStream + resp.bodyStream = NewStreamReader(func(sw *bufio.Writer) { + zw := acquireStacklessBrotliWriter(sw, level) + fw := &flushWriter{ + wf: zw, + bw: sw, + } + copyZeroAlloc(fw, bs) //nolint:errcheck + releaseStacklessBrotliWriter(zw, level) + if bsc, ok := bs.(io.Closer); ok { + bsc.Close() + } + }) + } else { + bodyBytes := resp.bodyBytes() + if len(bodyBytes) < minCompressLen { + // There is no sense in spending CPU time on small body compression, + // since there is a very high probability that the compressed + // body size will be bigger than the original body size. + return nil + } + w := responseBodyPool.Get() + w.B = AppendBrotliBytesLevel(w.B, bodyBytes, level) + + // Hack: swap resp.body with w. + if resp.body != nil { + responseBodyPool.Put(resp.body) + } + resp.body = w + resp.bodyRaw = nil + } + resp.Header.SetCanonical(strContentEncoding, strBr) + return nil +} + func (resp *Response) gzipBody(level int) error { if len(resp.Header.peek(strContentEncoding)) > 0 { // It looks like the body is already compressed. diff --git a/server.go b/server.go index 80ffd39638..b1081b8654 100644 --- a/server.go +++ b/server.go @@ -472,7 +472,7 @@ func CompressHandler(h RequestHandler) RequestHandler { } // CompressHandlerLevel returns RequestHandler that transparently compresses -// response body generated by h if the request contains 'gzip' or 'deflate' +// response body generated by h if the request contains a 'gzip' or 'deflate' // 'Accept-Encoding' header. // // Level is the desired compression level: @@ -493,6 +493,37 @@ func CompressHandlerLevel(h RequestHandler, level int) RequestHandler { } } +// CompressHandlerBrotliLevel returns RequestHandler that transparently compresses +// response body generated by h if the request contains a 'br', 'gzip' or 'deflate' +// 'Accept-Encoding' header. +// +// brotliLevel is the desired compression level for brotli. +// +// * CompressBrotliNoCompression +// * CompressBrotliBestSpeed +// * CompressBrotliBestCompression +// * CompressBrotliDefaultCompression +// +// otherLevel is the desired compression level for gzip and deflate. +// +// * CompressNoCompression +// * CompressBestSpeed +// * CompressBestCompression +// * CompressDefaultCompression +// * CompressHuffmanOnly +func CompressHandlerBrotliLevel(h RequestHandler, brotliLevel, otherLevel int) RequestHandler { + return func(ctx *RequestCtx) { + h(ctx) + if ctx.Request.Header.HasAcceptEncodingBytes(strBr) { + ctx.Response.brotliBody(brotliLevel) //nolint:errcheck + } else if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) { + ctx.Response.gzipBody(otherLevel) //nolint:errcheck + } else if ctx.Request.Header.HasAcceptEncodingBytes(strDeflate) { + ctx.Response.deflateBody(otherLevel) //nolint:errcheck + } + } +} + // RequestCtx contains incoming request and manages outgoing response. // // It is forbidden copying RequestCtx instances. diff --git a/strings.go b/strings.go index 12f1926308..abd16ebca4 100644 --- a/strings.go +++ b/strings.go @@ -69,6 +69,7 @@ var ( strClose = []byte("close") strGzip = []byte("gzip") + strBr = []byte("br") strDeflate = []byte("deflate") strKeepAlive = []byte("keep-alive") strUpgrade = []byte("Upgrade")