Skip to content

Commit

Permalink
[kube] support SPDY over websocket protocol
Browse files Browse the repository at this point in the history
SPDY has been deprecated for several years, and most proxies are expected to lose support in the coming months. To address this issue, Kubernetes introduced SPDY over WebSocket connections. This protocol leverages a WebSocket upgrade, but once established, it functions as a simple connection with SPDY-based framing.

This pull request (PR) introduces initial support for customer-facing upgrades. Future PRs will add support for teleport-to-teleport communication using the `SPDY/3.1+portforward.k8s.io` protocol.
  • Loading branch information
tigrato authored and github-actions committed Sep 20, 2024
1 parent 7aeb9f9 commit d627f08
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 42 deletions.
2 changes: 1 addition & 1 deletion fixtures/ci-teleport-rbac/ci-teleport.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ rules:
resourceNames: ["test-pod"]
- apiGroups: [""]
resources: ["pods/portforward"]
verbs: ["create"]
verbs: ["create", "get"]
resourceNames: ["test-pod"]
- apiGroups: [""]
resources: ["pods/ephemeralcontainers"]
Expand Down
125 changes: 86 additions & 39 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,34 +505,6 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) {
})
require.NoError(t, err)

// forward local port to target port 80 of the nginx container
localPort := newPortValue()

forwarder, err := newPortForwarder(proxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)

forwarderCh := make(chan error)
go func() { forwarderCh <- forwarder.ForwardPorts() }()
defer func() {
assert.NoError(t, <-forwarderCh, "Forward ports exited with error")
}()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for port forwarding.")
case <-forwarder.readyC:
}
defer close(forwarder.stopC)

resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.NoError(t, resp.Body.Close())

// impersonating client requests will bse denied
_, impersonatingProxyClientConfig, err := kube.ProxyClient(kube.ProxyConfig{
T: teleport,
Expand All @@ -542,18 +514,72 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) {
})
require.NoError(t, err)

localPort = newPortValue()
impersonatingForwarder, err := newPortForwarder(impersonatingProxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)
tests := []struct {
name string
builder func(*rest.Config, kubePortForwardArgs) (*kubePortForwarder, error)
}{
{
name: "SPDY portForwarder",
builder: newPortForwarder,
},
{
name: "SPDY over Websocket portForwarder",
builder: newPortForwarderSPDYOverWebsocket,
},
}

for _, tt := range tests {
t.Run(tt.name,
func(t *testing.T) {
// forward local port to target port 80 of the nginx container
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, listener.Close())
})

localPort := listener.Addr().(*net.TCPAddr).Port

forwarder, err := tt.builder(proxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)

forwarderCh := make(chan error)
go func() { forwarderCh <- forwarder.ForwardPorts() }()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for port forwarding.")
case <-forwarder.readyC:
}
t.Cleanup(func() {})

resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.NoError(t, resp.Body.Close())

close(forwarder.stopC)
require.NoError(t, <-forwarderCh, "Forward ports exited with error")

impersonatingForwarder, err := tt.builder(impersonatingProxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)

// This request should be denied
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*|.*403 Forbidden.*", err.Error())
},
)
}

// This request should be denied
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*", err.Error())
}

// TestKubeTrustedClustersClientCert tests scenario with trusted clusters
Expand Down Expand Up @@ -1742,6 +1768,27 @@ type kubePortForwarder struct {
readyC chan struct{}
}

func newPortForwarderSPDYOverWebsocket(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) {
u, err := url.Parse(kubeConfig.Host)
if err != nil {
return nil, trace.Wrap(err)
}
u.Scheme = "https"
u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v/portforward", args.podNamespace, args.podName)

tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(u, kubeConfig)
if err != nil {
return nil, trace.Wrap(err)
}

stopC, readyC := make(chan struct{}), make(chan struct{})
fwd, err := portforward.New(tunnelingDialer, args.ports, stopC, readyC, nil, nil)
if err != nil {
return nil, trace.Wrap(err)
}
return &kubePortForwarder{PortForwarder: fwd, stopC: stopC, readyC: readyC}, nil
}

func newPortForwarder(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) {
u, err := url.Parse(kubeConfig.Host)
if err != nil {
Expand Down
31 changes: 29 additions & 2 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/httpstream"
httpstreamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/tools/remotecommand"
Expand Down Expand Up @@ -1830,10 +1831,14 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
// Go client uses SPDY while other clients still require WebSockets.
// This function will run until the end of the execution of the request.
func runPortForwarding(req portForwardRequest) error {
if wsstream.IsWebSocketRequest(req.httpRequest) {
switch {
case wsstream.IsWebSocketRequestWithTunnelingProtocol(req.httpRequest):
return trace.Wrap(runPortForwardingTunneledHTTPStreams(req))
case wsstream.IsWebSocketRequest(req.httpRequest):
return trace.Wrap(runPortForwardingWebSocket(req))
default:
return trace.Wrap(runPortForwardingHTTPStreams(req))
}
return trace.Wrap(runPortForwardingHTTPStreams(req))
}

const (
Expand Down Expand Up @@ -2110,6 +2115,7 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http
return nil, trace.Wrap(err)
}

req = createSPDYRequest(req, PortForwardProtocolV1Name)
upgradeRoundTripper := NewSpdyRoundTripperWithDialer(roundTripperConfig{
ctx: req.Context(),
sess: sess,
Expand All @@ -2135,6 +2141,27 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http
return spdy.NewDialer(upgradeRoundTripper, client, req.Method, req.URL), nil
}

// createSPDYRequest modifies the passed request to remove
// WebSockets headers and add SPDY upgrade information, including
// spdy protocols acceptable to the client.
func createSPDYRequest(req *http.Request, spdyProtocols ...string) *http.Request {
clone := req.Clone(req.Context())
// Clean up the websocket headers from the http request.
clone.Header.Del(wsstream.WebSocketProtocolHeader)
clone.Header.Del("Sec-Websocket-Key")
clone.Header.Del("Sec-Websocket-Version")
clone.Header.Del(httpstream.HeaderUpgrade)
// Update the http request for an upstream SPDY upgrade.
clone.Method = "POST"
clone.Body = nil // Remove the request body which is unused.
clone.Header.Set(httpstream.HeaderUpgrade, httpstreamspdy.HeaderSpdy31)
clone.Header.Del(httpstream.HeaderProtocolVersion)
for i := range spdyProtocols {
clone.Header.Add(httpstream.HeaderProtocolVersion, spdyProtocols[i])
}
return clone
}

// clusterSession contains authenticated user session to the target cluster:
// x509 short lived credentials, forwarding proxies and other data
type clusterSession struct {
Expand Down
60 changes: 60 additions & 0 deletions lib/kube/proxy/portforward_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ import (
"strings"
"sync"

gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/httpstream"
spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
portforwardconstants "k8s.io/apimachinery/pkg/util/portforward"
"k8s.io/client-go/tools/portforward"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -295,3 +299,59 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair)

h.Debugf("Port forwarding pair completed.")
}

// runPortForwardingTunneledHTTPStreams handles a port-forwarding request that uses SPDY protocol
// over WebSockets.
func runPortForwardingTunneledHTTPStreams(req portForwardRequest) error {
targetConn, _, err := req.targetDialer.Dial(PortForwardProtocolV1Name)
if err != nil {
return trace.Wrap(err, "error upgrading target connection")
}
defer targetConn.Close()

// Try to upgrade the websocket connection.
// Beyond this point, we don't need to write errors to the response.
upgrader := gwebsocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
Subprotocols: []string{portforwardconstants.WebsocketsSPDYTunnelingPortForwardV1},
}
conn, err := upgrader.Upgrade(req.httpResponseWriter, req.httpRequest, nil)
if err != nil {
return trace.Wrap(err)
}

tunneledConn := portforward.NewTunnelingConnection("server", conn)

streamChan := make(chan httpstream.Stream, 1)
spdyConn, err := spdystream.NewServerConnectionWithPings(
tunneledConn,
httpStreamReceived(req.context, streamChan),
req.pingPeriod,
)
if err != nil {
return trace.Wrap(err, "error upgrading connection")
}

if conn == nil {
return trace.ConnectionProblem(nil, "Unable to upgrade websocket connection")
}
defer conn.Close()

h := &portForwardProxy{
Entry: logrus.WithFields(logrus.Fields{
teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube),
events.RemoteAddr: req.httpRequest.RemoteAddr,
}),
portForwardRequest: req,
sourceConn: spdyConn,
streamChan: streamChan,
streamPairs: make(map[string]*httpStreamPair),
streamCreationTimeout: DefaultStreamCreationTimeout,
targetConn: targetConn,
}
defer h.Close()
h.Debugf("Setting port forwarding streaming connection idle timeout to %v", IdleTimeout)
spdyConn.SetIdleTimeout(IdleTimeout)
h.run()
return nil
}

0 comments on commit d627f08

Please sign in to comment.