Skip to content

Commit

Permalink
skipper: support http2 over cleartext TCP (h2c)
Browse files Browse the repository at this point in the history
Works around golang/go#26682 by introducing
ShutdownListener that tracks active connections and implements graceful
shutdown.

For #1253

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Jul 24, 2023
1 parent f35389e commit ff6f242
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 25 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type Config struct {
ExpectContinueTimeoutBackend time.Duration `yaml:"expect-continue-timeout-backend"`
MaxIdleConnsBackend int `yaml:"max-idle-connection-backend"`
DisableHTTPKeepalives bool `yaml:"disable-http-keepalives"`
EnableHttp2Cleartext bool `yaml:"enable-http2-cleartext"`

// swarm:
EnableSwarm bool `yaml:"enable-swarm"`
Expand Down Expand Up @@ -523,6 +524,7 @@ func NewConfig() *Config {
flag.IntVar(&cfg.MaxIdleConnsBackend, "max-idle-connection-backend", 0, "sets the maximum idle connections for all backend connections")
flag.BoolVar(&cfg.DisableHTTPKeepalives, "disable-http-keepalives", false, "forces backend to always create a new connection")
flag.BoolVar(&cfg.KubernetesEnableTLS, "kubernetes-enable-tls", false, "enable using kubnernetes resources to terminate tls")
flag.BoolVar(&cfg.EnableHttp2Cleartext, "enable-http2-cleartext", false, "enables HTTP/2 connections over cleartext TCP")

// Swarm:
flag.BoolVar(&cfg.EnableSwarm, "enable-swarm", false, "enable swarm communication between nodes in a skipper fleet")
Expand Down Expand Up @@ -850,6 +852,7 @@ func (c *Config) ToOptions() skipper.Options {
MaxIdleConnsBackend: c.MaxIdleConnsBackend,
DisableHTTPKeepalives: c.DisableHTTPKeepalives,
KubernetesEnableTLS: c.KubernetesEnableTLS,
EnableHttp2Cleartext: c.EnableHttp2Cleartext,

// swarm:
EnableSwarm: c.EnableSwarm,
Expand Down
81 changes: 81 additions & 0 deletions net/shutdown_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package net

import (
"context"
"net"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
)

type (
ShutdownListener struct {
net.Listener
activeConns atomic.Int64
}

shutdownListenerConn struct {
net.Conn
listener *ShutdownListener
closeOnce sync.Once
}
)

var _ net.Listener = &ShutdownListener{}

func NewShutdownListener(l net.Listener) *ShutdownListener {
return &ShutdownListener{Listener: l}
}

func (l *ShutdownListener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}

l.registerConn()

return &shutdownListenerConn{Conn: c, listener: l}, nil
}

func (l *ShutdownListener) Close() error {
err := l.Listener.Close()
return err
}

func (l *ShutdownListener) Shutdown(ctx context.Context) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
n := l.activeConns.Load()
log.Debugf("ShutdownListener Shutdown: %d connections", n)
if n == 0 {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}

func (c *shutdownListenerConn) Close() error {
err := c.Conn.Close()

c.closeOnce.Do(func() { c.listener.unregisterConn() })

return err
}

func (l *ShutdownListener) registerConn() {
n := l.activeConns.Add(1)
log.Debugf("ShutdownListener registerConn: %d connections", n)
}

func (l *ShutdownListener) unregisterConn() {
n := l.activeConns.Add(-1)
log.Debugf("ShutdownListener unregisterConn: %d connections", n)
}
50 changes: 41 additions & 9 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
ot "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/zalando/skipper/circuit"
"github.com/zalando/skipper/dataclients/kubernetes"
Expand All @@ -42,7 +44,7 @@ import (
"github.com/zalando/skipper/loadbalancer"
"github.com/zalando/skipper/logging"
"github.com/zalando/skipper/metrics"
skpnet "github.com/zalando/skipper/net"
snet "github.com/zalando/skipper/net"
pauth "github.com/zalando/skipper/predicates/auth"
"github.com/zalando/skipper/predicates/content"
"github.com/zalando/skipper/predicates/cookie"
Expand Down Expand Up @@ -381,6 +383,9 @@ type Options struct {
// a backend to always create a new connection.
DisableHTTPKeepalives bool

// EnableHttp2Cleartext enables HTTP/2 connections over cleartext TCP.
EnableHttp2Cleartext bool

// Flag indicating to ignore trailing slashes in paths during route
// lookup.
IgnoreTrailingSlash bool
Expand Down Expand Up @@ -1107,6 +1112,10 @@ func (o *Options) tlsConfig(cr *certregistry.CertRegistry) (*tls.Config, error)
return nil, nil
}

if o.EnableHttp2Cleartext {
return nil, fmt.Errorf("HTTP/2 connections over cleartext TCP are not supported when TLS is enabled")
}

config := &tls.Config{
MinVersion: o.TLSMinVersion,
}
Expand Down Expand Up @@ -1233,11 +1242,27 @@ func listenAndServeQuit(
}
}

if o.EnableHttp2Cleartext {
h2srv := &http2.Server{}
srv.Handler = h2c.NewHandler(srv.Handler, h2srv)

// Work around https://github.com/golang/go/issues/26682
// http2.ConfigureServer registers unexported h2srv graceful shutdown handler on srv shutdown -
// it calls srv.RegisterOnShutdown(h2srv.state.startGracefulShutdown).
// h2srv graceful shutdown handler sends GOAWAY frame to all connections and closes them after predefined delay.
//
// srv.Shutdown() runs h2srv shutdown handler in a goroutine so a special snet.ShutdownListener
// waits until all connections are closed.
http2.ConfigureServer(srv, h2srv)
}

log.Infof("Listen on %v", address)

l, err := listen(o, address, mtr)
if err != nil {
var listener *snet.ShutdownListener
if l, err := listen(o, address, mtr); err != nil {
return err
} else {
listener = snet.NewShutdownListener(l)
}

// making idleConnsCH and sigs optional parameters is required to be able to tear down a server
Expand All @@ -1258,10 +1283,16 @@ func listenAndServeQuit(
log.Infof("Got shutdown signal, wait %v for health check", o.WaitForHealthcheckInterval)
time.Sleep(o.WaitForHealthcheckInterval)

log.Info("Start shutdown")
log.Info("Start server shutdown")
if err := srv.Shutdown(context.Background()); err != nil {
log.Errorf("Failed to graceful shutdown: %v", err)
log.Errorf("Failed to gracefully shutdown: %v", err)
}

log.Info("Start listener shutdown")
if err := listener.Shutdown(context.Background()); err != nil {
log.Errorf("Failed to gracefully shutdown listener: %v", err)
}

close(idleConnsCH)
}()

Expand All @@ -1281,20 +1312,21 @@ func listenAndServeQuit(
}()
}

if err := srv.ServeTLS(l, "", ""); err != http.ErrServerClosed {
if err := srv.ServeTLS(listener, "", ""); err != http.ErrServerClosed {
log.Errorf("ServeTLS failed: %v", err)
return err
}
} else {
log.Infof("TLS settings not found, defaulting to HTTP")

if err := srv.Serve(l); err != http.ErrServerClosed {
if err := srv.Serve(listener); err != http.ErrServerClosed {
log.Errorf("Serve failed: %v", err)
return err
}
}

<-idleConnsCH

log.Infof("done.")
return nil
}
Expand Down Expand Up @@ -1580,13 +1612,13 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
}

var swarmer ratelimit.Swarmer
var redisOptions *skpnet.RedisOptions
var redisOptions *snet.RedisOptions
log.Infof("enable swarm: %v", o.EnableSwarm)
if o.EnableSwarm {
if len(o.SwarmRedisURLs) > 0 || o.KubernetesRedisServiceName != "" || o.SwarmRedisEndpointsRemoteURL != "" {
log.Infof("Redis based swarm with %d shards", len(o.SwarmRedisURLs))

redisOptions = &skpnet.RedisOptions{
redisOptions = &snet.RedisOptions{
Addrs: o.SwarmRedisURLs,
Password: o.SwarmRedisPassword,
HashAlgorithm: o.SwarmRedisHashAlgorithm,
Expand Down
Loading

0 comments on commit ff6f242

Please sign in to comment.