Skip to content

Commit

Permalink
routesrv: accept gzip encoding (#2738)
Browse files Browse the repository at this point in the history
Support gzip encoding to reduce response size.

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov authored Nov 16, 2023
1 parent ec3881d commit a92ff60
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 13 deletions.
52 changes: 42 additions & 10 deletions routesrv/eskipbytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package routesrv

import (
"bytes"
"compress/gzip"
"crypto/sha256"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -50,12 +52,15 @@ var (
// provides synchronized r/w access to them. Additionally it can
// serve as an HTTP handler exposing its content.
type eskipBytes struct {
mu sync.RWMutex
data []byte
etag string
hash string
lastModified time.Time
initialized bool
count int
mu sync.RWMutex

zw *gzip.Writer
zdata []byte

tracer ot.Tracer
metrics metrics.Metrics
Expand All @@ -77,13 +82,33 @@ func (e *eskipBytes) formatAndSet(routes []*eskip.Route) (_ int, _ string, initi
if updated {
e.lastModified = e.now()
e.data = data
e.etag = fmt.Sprintf(`"%x"`, sha256.Sum256(e.data))
e.zdata = e.compressLocked(data)
e.hash = fmt.Sprintf("%x", sha256.Sum256(e.data))
e.count = len(routes)
}
initialized = !e.initialized
e.initialized = true

return len(e.data), e.etag, initialized, updated
return len(e.data), e.hash, initialized, updated
}

// compressLocked compresses the data with gzip and returns
// the compressed data or nil if compression fails.
// e.mu must be held.
func (e *eskipBytes) compressLocked(data []byte) []byte {
var buf bytes.Buffer
if e.zw == nil {
e.zw = gzip.NewWriter(&buf)
} else {
e.zw.Reset(&buf)
}
if _, err := e.zw.Write(data); err != nil {
return nil
}
if err := e.zw.Close(); err != nil {
return nil
}
return buf.Bytes()
}

func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -112,17 +137,24 @@ func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
e.mu.RLock()
count := e.count
data := e.data
etag := e.etag
zdata := e.zdata
hash := e.hash
lastModified := e.lastModified
initialized := e.initialized
e.mu.RUnlock()

if initialized {
w.Header().Add("Etag", etag)
w.Header().Add("Content-Type", "text/plain; charset=utf-8")
w.Header().Add(routing.RoutesCountName, strconv.Itoa(count))

http.ServeContent(w, r, "", lastModified, bytes.NewReader(data))
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set(routing.RoutesCountName, strconv.Itoa(count))

if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && len(zdata) > 0 {
w.Header().Set("Etag", `"`+hash+`+gzip"`)
w.Header().Set("Content-Encoding", "gzip")
http.ServeContent(w, r, "", lastModified, bytes.NewReader(zdata))
} else {
w.Header().Set("Etag", `"`+hash+`"`)
http.ServeContent(w, r, "", lastModified, bytes.NewReader(data))
}
} else {
w.WriteHeader(http.StatusNotFound)
}
Expand Down
6 changes: 3 additions & 3 deletions routesrv/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func (p *poller) poll(wg *sync.WaitGroup) {
"message", LogRoutesEmpty,
)
case routesCount > 0:
routesBytes, routesEtag, initialized, updated := p.b.formatAndSet(routes)
logger := log.WithFields(log.Fields{"count": routesCount, "bytes": routesBytes, "etag": routesEtag})
routesBytes, routesHash, initialized, updated := p.b.formatAndSet(routes)
logger := log.WithFields(log.Fields{"count": routesCount, "bytes": routesBytes, "hash": routesHash})
if initialized {
logger.Info(LogRoutesInitialized)
span.SetTag("routes.initialized", true)
Expand All @@ -94,7 +94,7 @@ func (p *poller) poll(wg *sync.WaitGroup) {
}
span.SetTag("routes.count", routesCount)
span.SetTag("routes.bytes", routesBytes)
span.SetTag("routes.etag", routesEtag)
span.SetTag("routes.hash", routesHash)

if updated && log.IsLevelEnabled(log.DebugLevel) {
routesById := mapRoutes(routes)
Expand Down
92 changes: 92 additions & 0 deletions routesrv/routesrv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package routesrv_test

import (
"bytes"
"compress/gzip"
"flag"
"io"
"net/http"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/zalando/skipper"
"github.com/zalando/skipper/dataclients/kubernetes/kubernetestest"
Expand Down Expand Up @@ -724,3 +726,93 @@ func TestRoutesWithExplicitLBAlgorithm(t *testing.T) {
}
wantHTTPCode(t, responseRecorder, http.StatusOK)
}

func TestESkipBytesHandlerGzip(t *testing.T) {
defer tl.Reset()
ks, handler := newKubeServer(t, loadKubeYAML(t, "testdata/lb-target-multi.yaml"))
ks.Start()
defer ks.Close()
rs := newRouteServer(t, ks)

rs.StartUpdates()
defer rs.StopUpdates()

testGzipResponse := func(t *testing.T, count int) {
// Get plain response
plainResponse := getRoutes(rs)
plainEtag := plainResponse.Header().Get("Etag")
plainContent := plainResponse.Body.Bytes()

// Get gzip response
gzipResponse := getRoutesWithRequestHeadersSetting(rs, map[string]string{"Accept-Encoding": "gzip"})
assert.Equal(t, http.StatusOK, gzipResponse.Code)
assert.Equal(t, "text/plain; charset=utf-8", gzipResponse.Header().Get("Content-Type"))
assert.Equal(t, "gzip", gzipResponse.Header().Get("Content-Encoding"))
assert.Equal(t, strconv.Itoa(count), gzipResponse.Header().Get("X-Count"))

gzipEtag := gzipResponse.Header().Get("Etag")
assert.NotEqual(t, plainEtag, gzipEtag, "gzip Etag should differ from plain Etag")

zr, err := gzip.NewReader(gzipResponse.Body)
require.NoError(t, err)
defer zr.Close()

gzipContent, err := io.ReadAll(zr)
require.NoError(t, err)

assert.Equal(t, plainContent, gzipContent, "gzip content should be equal to plain content")

// Get gzip response using Etag
gzipEtagResponse := getRoutesWithRequestHeadersSetting(rs, map[string]string{"If-None-Match": gzipEtag, "Accept-Encoding": "gzip"})

assert.Equal(t, http.StatusNotModified, gzipEtagResponse.Code)
// RFC 7232 section 4.1:
assert.Empty(t, gzipEtagResponse.Header().Get("Content-Type"))
assert.Empty(t, gzipEtagResponse.Header().Get("Content-Length"))
assert.Empty(t, gzipEtagResponse.Header().Get("Content-Encoding"))
assert.Equal(t, strconv.Itoa(count), gzipEtagResponse.Header().Get("X-Count"))
assert.Empty(t, gzipEtagResponse.Body.String())
}

require.NoError(t, tl.WaitFor(routesrv.LogRoutesInitialized, waitTimeout))
testGzipResponse(t, 3)

handler.set(newKubeAPI(t, loadKubeYAML(t, "testdata/lb-target-single.yaml")))
require.NoError(t, tl.WaitForN(routesrv.LogRoutesUpdated, 2, waitTimeout))

testGzipResponse(t, 2)
}

func TestESkipBytesHandlerGzipServedForDefaultClient(t *testing.T) {
defer tl.Reset()
ks, _ := newKubeServer(t, loadKubeYAML(t, "testdata/lb-target-multi.yaml"))
ks.Start()
defer ks.Close()

rs, err := routesrv.New(skipper.Options{
SourcePollTimeout: pollInterval,
KubernetesURL: ks.URL,
})
require.NoError(t, err)

rs.StartUpdates()
defer rs.StopUpdates()

require.NoError(t, tl.WaitFor(routesrv.LogRoutesInitialized, waitTimeout))

ts := httptest.NewServer(rs)
defer ts.Close()

resp, err := ts.Client().Get(ts.URL + "/routes")
require.NoError(t, err)

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.True(t, resp.Uncompressed, "expected uncompressed body")

b, err := io.ReadAll(resp.Body)
require.NoError(t, err)

routes, err := eskip.Parse(string(b))
require.NoError(t, err)
assert.Len(t, routes, 3)
}

0 comments on commit a92ff60

Please sign in to comment.