Skip to content

Commit

Permalink
apiserver: Adding RemoteProxyREST for pass-through any requests
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <[email protected]>
  • Loading branch information
Iceber committed Dec 27, 2024
1 parent 49eb28a commit 239fd56
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 136 deletions.
11 changes: 6 additions & 5 deletions pkg/kubeapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/discovery"
podrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/pod"
proxyrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/proxy"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/filters"
)
Expand Down Expand Up @@ -140,16 +140,17 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)

controller := NewClusterResourceController(restManager, discoveryManager, c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters())

for _, rest := range podrest.GetPodSubresourceRESTs(controller) {
for _, rest := range proxyrest.GetSubresourceRESTs(controller) {
restManager.preRegisterSubresource(subresource{
gr: schema.GroupResource{Group: "", Resource: "pods"},
kind: "Pod",
namespaced: true,
gr: rest.ParentGroupResource(),
kind: rest.ParentKind(),
namespaced: rest.Namespaced(),

name: rest.Subresource(),
connecter: rest,
})
}
resourceHandler.proxy = proxyrest.NewRemoteProxyREST(c.GenericConfig.Serializer, controller)
return genericserver, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kubeapiserver/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type ResourceHandler struct {
minRequestTimeout time.Duration
delegate http.Handler
proxy http.Handler

rest *RESTManager
discovery *discovery.DiscoveryManager
Expand Down
131 changes: 0 additions & 131 deletions pkg/kubeapiserver/resourcerest/pod/subresource.go

This file was deleted.

93 changes: 93 additions & 0 deletions pkg/kubeapiserver/resourcerest/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package proxy

import (
"context"
"errors"
"net/http"
"net/url"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"

"github.com/clusterpedia-io/clusterpedia/pkg/utils/request"
)

type ClusterConnectionGetter interface {
GetClusterDefaultConnection(ctx context.Context, cluster string) (string, http.RoundTripper, error)
GetClusterConnectionWithTLSConfig(ctx context.Context, cluster string) (string, http.RoundTripper, error)
}

type RemoteProxyREST struct {
serializer runtime.NegotiatedSerializer
connGetter ClusterConnectionGetter
}

func NewRemoteProxyREST(serializer runtime.NegotiatedSerializer, connGetter ClusterConnectionGetter) http.Handler {
return &RemoteProxyREST{serializer: serializer, connGetter: connGetter}
}

func (r *RemoteProxyREST) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
handler, err := proxyConn(req.Context(), r.connGetter, false, r, nil)
if err != nil {
r.Error(rw, req, err)
}
handler.ServeHTTP(rw, req)
}

func (r *RemoteProxyREST) Error(w http.ResponseWriter, req *http.Request, err error) {
responsewriters.ErrorNegotiated(err, r.serializer, schema.GroupVersion{}, w, req)
}

func proxyConn(ctx context.Context, connGetter ClusterConnectionGetter, upgradeRequired bool, responder proxy.ErrorResponder, wrapProxy func(*proxy.UpgradeAwareHandler) http.Handler) (http.Handler, error) {
clusterName := request.ClusterNameValue(ctx)
if clusterName == "" {
return nil, errors.New("missing cluster")
}

requestInfo, ok := genericrequest.RequestInfoFrom(ctx)
if !ok {
return nil, errors.New("missing RequestInfo")
}

// TODO(iceber): need disconnect when the cluster authentication information changes
endpoint, transport, err := connGetter.GetClusterDefaultConnection(ctx, clusterName)
if err != nil {
return nil, err
}

target, err := url.ParseRequestURI(endpoint + requestInfo.Path)
if err != nil {
return nil, err
}
target.RawQuery = request.RequestQueryFrom(ctx).Encode()

proxy := proxy.NewUpgradeAwareHandler(target, transport, false, upgradeRequired, responder)
proxy.UseLocationHost = true

var handler http.Handler = proxy
if wrapProxy != nil {
handler = wrapProxy(proxy)
}
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
r := req.WithContext(req.Context())
r.Header = utilnet.CloneHeader(req.Header)
if auditID, _ := audit.AuditIDFrom(ctx); auditID != "" {
req.Header.Set(auditinternal.HeaderAuditID, string(auditID))
}

handler.ServeHTTP(rw, req)

// merge headers
for _, header := range []string{"Cache-Control", auditinternal.HeaderAuditID} {
if vs := rw.Header().Values(header); len(vs) > 1 {
rw.Header().Set(header, vs[0])
}
}
}), nil
}
110 changes: 110 additions & 0 deletions pkg/kubeapiserver/resourcerest/proxy/subresources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package proxy

import (
"context"
"net/http"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/registry/rest"
registryrest "k8s.io/apiserver/pkg/registry/rest"
api "k8s.io/kubernetes/pkg/apis/core"
)

func GetSubresourceRESTs(connGetter ClusterConnectionGetter) []*PodSubresourceRemoteProxyREST {
return []*PodSubresourceRemoteProxyREST{
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "attach",
methods: []string{"GET", "POST"},
upgradeRequired: true,
options: &api.PodAttachOptions{},
connGetter: connGetter,
},
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "exec",
methods: []string{"GET", "POST"},
upgradeRequired: true,
options: &api.PodExecOptions{},
connGetter: connGetter,
},
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "portforward",
methods: []string{"GET", "POST"},
upgradeRequired: true,
options: &api.PodPortForwardOptions{},
connGetter: connGetter,
},
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "log",
methods: []string{"GET"},
upgradeRequired: false,
options: &api.PodLogOptions{},
connGetter: connGetter,
},
}
}

type PodSubresourceRemoteProxyREST struct {
parent schema.GroupResource
namespaced bool
parentKind string

subresource string

methods []string
options runtime.Object

upgradeRequired bool
connGetter ClusterConnectionGetter
}

var _ rest.Storage = &PodSubresourceRemoteProxyREST{}
var _ rest.Connecter = &PodSubresourceRemoteProxyREST{}

func (r *PodSubresourceRemoteProxyREST) ParentGroupResource() schema.GroupResource {
return r.parent
}

func (r *PodSubresourceRemoteProxyREST) ParentKind() string {
return r.parentKind
}

func (r *PodSubresourceRemoteProxyREST) Namespaced() bool {
return r.namespaced
}

func (r *PodSubresourceRemoteProxyREST) Subresource() string {
return r.subresource
}

func (r *PodSubresourceRemoteProxyREST) New() runtime.Object {
return r.options.DeepCopyObject()
}

func (r *PodSubresourceRemoteProxyREST) Destroy() {
}

func (r *PodSubresourceRemoteProxyREST) NewConnectOptions() (runtime.Object, bool, string) {
return r.options.DeepCopyObject(), false, ""
}

func (r *PodSubresourceRemoteProxyREST) ConnectMethods() []string {
return r.methods
}

func (r *PodSubresourceRemoteProxyREST) Connect(ctx context.Context, name string, opts runtime.Object, responder registryrest.Responder) (http.Handler, error) {
return proxyConn(ctx, r.connGetter, r.upgradeRequired, proxy.NewErrorResponder(responder), nil)
}

0 comments on commit 239fd56

Please sign in to comment.