From a5831b37b9cdb6f9fd20c2b4d9dbea343e76d2e9 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Thu, 8 Feb 2024 13:10:59 +0200 Subject: [PATCH] Allow access to the microcluster app from rest endpoints (#110) --- src/k8s/api/v1/cluster_node.go | 8 +- src/k8s/cmd/k8s/k8s_join_cluster.go | 2 +- src/k8s/pkg/k8s/client/cluster_node.go | 6 +- src/k8s/pkg/k8sd/api/cluster_join.go | 36 ++++ src/k8s/pkg/k8sd/api/cluster_node.go | 56 ------ .../k8sd/api/{tokens.go => cluster_tokens.go} | 34 +--- src/k8s/pkg/k8sd/api/endpoints.go | 170 +++++++++--------- src/k8s/pkg/k8sd/api/handler.go | 22 +++ .../api/{cluster_config.go => kubeconfig.go} | 0 .../pkg/k8sd/api/kubernetes_auth_tokens.go | 4 +- src/k8s/pkg/k8sd/app/app.go | 2 +- 11 files changed, 165 insertions(+), 175 deletions(-) create mode 100644 src/k8s/pkg/k8sd/api/cluster_join.go delete mode 100644 src/k8s/pkg/k8sd/api/cluster_node.go rename src/k8s/pkg/k8sd/api/{tokens.go => cluster_tokens.go} (64%) create mode 100644 src/k8s/pkg/k8sd/api/handler.go rename src/k8s/pkg/k8sd/api/{cluster_config.go => kubeconfig.go} (100%) diff --git a/src/k8s/api/v1/cluster_node.go b/src/k8s/api/v1/cluster_node.go index 6d1c24b15..052fbf1a3 100644 --- a/src/k8s/api/v1/cluster_node.go +++ b/src/k8s/api/v1/cluster_node.go @@ -1,11 +1,11 @@ package v1 -// JoinNodeRequest is used to request to add a node to the cluster. -type JoinNodeRequest struct { +// JoinClusterRequest is used to request to add a node to the cluster. +type JoinClusterRequest struct { Name string `json:"name"` Address string `json:"address"` Token string `json:"token"` } -// JoinNodeResponse is the response from "POST 1.0/k8sd/cluster/{node}" -type JoinNodeResponse struct{} +// JoinClusterResponse is the response from "POST 1.0/k8sd/cluster/{node}" +type JoinClusterResponse struct{} diff --git a/src/k8s/cmd/k8s/k8s_join_cluster.go b/src/k8s/cmd/k8s/k8s_join_cluster.go index ccaaea2de..367d23da9 100644 --- a/src/k8s/cmd/k8s/k8s_join_cluster.go +++ b/src/k8s/cmd/k8s/k8s_join_cluster.go @@ -62,7 +62,7 @@ var ( timeoutCtx, cancel := context.WithTimeout(cmd.Context(), joinNodeCmdOpts.timeout) defer cancel() - err = c.JoinNode(timeoutCtx, joinNodeCmdOpts.name, joinNodeCmdOpts.address, token) + err = c.JoinCluster(timeoutCtx, joinNodeCmdOpts.name, joinNodeCmdOpts.address, token) if err != nil { return fmt.Errorf("failed to join cluster: %w", err) } diff --git a/src/k8s/pkg/k8s/client/cluster_node.go b/src/k8s/pkg/k8s/client/cluster_node.go index cd6bdabc4..2f2b667f8 100644 --- a/src/k8s/pkg/k8s/client/cluster_node.go +++ b/src/k8s/pkg/k8s/client/cluster_node.go @@ -11,17 +11,17 @@ import ( "github.com/canonical/lxd/shared/api" ) -func (c *Client) JoinNode(ctx context.Context, name string, address string, token string) error { +func (c *Client) JoinCluster(ctx context.Context, name string, address string, token string) error { if err := c.m.Ready(30); err != nil { return fmt.Errorf("cluster did not come up in time: %w", err) } - request := apiv1.JoinNodeRequest{ + request := apiv1.JoinClusterRequest{ Name: name, Address: address, Token: token, } - var response apiv1.JoinNodeResponse + var response apiv1.JoinClusterResponse err := c.mc.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster", "join"), request, &response) if err != nil { fmt.Fprintln(os.Stderr, "failed to join node - cleaning up now") diff --git a/src/k8s/pkg/k8sd/api/cluster_join.go b/src/k8s/pkg/k8sd/api/cluster_join.go new file mode 100644 index 000000000..3c5268e27 --- /dev/null +++ b/src/k8s/pkg/k8sd/api/cluster_join.go @@ -0,0 +1,36 @@ +package api + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + + apiv1 "github.com/canonical/k8s/api/v1" + "github.com/canonical/k8s/pkg/k8sd/types" + "github.com/canonical/lxd/lxd/response" + "github.com/canonical/microcluster/microcluster" + "github.com/canonical/microcluster/state" +) + +func postClusterJoin(m *microcluster.MicroCluster, s *state.State, r *http.Request) response.Response { + req := apiv1.JoinClusterRequest{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) + } + + // differentiate between control plane and worker node tokens + info := &types.InternalWorkerNodeToken{} + if info.Decode(req.Token) == nil { + // valid worker node token + if err := m.NewCluster(req.Name, req.Address, map[string]string{"workerToken": req.Token}, time.Second*180); err != nil { + return response.InternalError(fmt.Errorf("failed to join k8sd cluster as worker: %w", err)) + } + } else { + if err := m.JoinCluster(req.Name, req.Address, req.Token, nil, time.Second*180); err != nil { + return response.InternalError(fmt.Errorf("failed to join k8sd cluster as control plane: %w", err)) + } + } + + return response.SyncResponse(true, &apiv1.JoinClusterResponse{}) +} diff --git a/src/k8s/pkg/k8sd/api/cluster_node.go b/src/k8s/pkg/k8sd/api/cluster_node.go deleted file mode 100644 index abfea4ef0..000000000 --- a/src/k8s/pkg/k8sd/api/cluster_node.go +++ /dev/null @@ -1,56 +0,0 @@ -package api - -import ( - "encoding/json" - "fmt" - "net/http" - "time" - - apiv1 "github.com/canonical/k8s/api/v1" - "github.com/canonical/k8s/pkg/k8sd/types" - "github.com/canonical/lxd/lxd/response" - "github.com/canonical/microcluster/microcluster" - "github.com/canonical/microcluster/state" -) - -func postClusterNode(s *state.State, r *http.Request) response.Response { - req := apiv1.JoinNodeRequest{} - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) - } - - // differentiate between control plane and worker node tokens - info := &types.InternalWorkerNodeToken{} - if info.Decode(req.Token) == nil { - // valid worker node token - if err := joinWorkerNode(s, r, req.Name, req.Address, req.Token); err != nil { - return response.SmartError(fmt.Errorf("failed to join k8sd cluster as worker: %w", err)) - } - } else { - if err := joinControlPlaneNode(s, r, req.Name, req.Address, req.Token); err != nil { - return response.SmartError(fmt.Errorf("failed to join k8sd cluster as control plane: %w", err)) - } - } - - return response.SyncResponse(true, &apiv1.JoinNodeResponse{}) -} - -func joinWorkerNode(s *state.State, r *http.Request, name, address, token string) error { - m, err := microcluster.App(r.Context(), microcluster.Args{ - StateDir: s.OS.StateDir, - }) - if err != nil { - return fmt.Errorf("failed to get microcluster app: %w", err) - } - return m.NewCluster(name, address, map[string]string{"workerToken": token}, time.Second*180) -} - -func joinControlPlaneNode(s *state.State, r *http.Request, name, address, token string) error { - m, err := microcluster.App(r.Context(), microcluster.Args{ - StateDir: s.OS.StateDir, - }) - if err != nil { - return fmt.Errorf("failed to get microcluster app: %w", err) - } - return m.JoinCluster(name, address, token, nil, time.Second*180) -} diff --git a/src/k8s/pkg/k8sd/api/tokens.go b/src/k8s/pkg/k8sd/api/cluster_tokens.go similarity index 64% rename from src/k8s/pkg/k8sd/api/tokens.go rename to src/k8s/pkg/k8sd/api/cluster_tokens.go index b53c4d967..6d39c8688 100644 --- a/src/k8s/pkg/k8sd/api/tokens.go +++ b/src/k8s/pkg/k8sd/api/cluster_tokens.go @@ -15,29 +15,29 @@ import ( "github.com/canonical/microcluster/state" ) -func postTokens(s *state.State, r *http.Request) response.Response { +func postClusterTokens(m *microcluster.MicroCluster, s *state.State, r *http.Request) response.Response { req := apiv1.TokenRequest{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) } - var token string - var err error + var ( + token string + err error + ) if req.Worker { - token, err = createWorkerToken(s, r) + token, err = createWorkerToken(s) } else { - token, err = createControlPlaneToken(s, r, req.Name) + token, err = m.NewJoinToken(req.Name) } - if err != nil { - return response.SmartError(fmt.Errorf("failed to create token: %w", err)) + return response.InternalError(fmt.Errorf("failed to create token: %w", err)) } return response.SyncResponse(true, &apiv1.TokensResponse{EncodedToken: token}) - } -func createWorkerToken(s *state.State, r *http.Request) (string, error) { +func createWorkerToken(s *state.State) (string, error) { var token string if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { var err error @@ -67,19 +67,3 @@ func createWorkerToken(s *state.State, r *http.Request) (string, error) { return token, nil } - -func createControlPlaneToken(s *state.State, r *http.Request, name string) (string, error) { - m, err := microcluster.App(r.Context(), microcluster.Args{ - StateDir: s.OS.StateDir, - }) - if err != nil { - return "", fmt.Errorf("failed to get microcluster app: %w", err) - } - - c, err := m.LocalClient() - if err != nil { - return "", fmt.Errorf("failed to get local microcluster client: %w", err) - } - - return c.RequestToken(r.Context(), name) -} diff --git a/src/k8s/pkg/k8sd/api/endpoints.go b/src/k8s/pkg/k8sd/api/endpoints.go index 48ff60d3c..d08f67ecd 100644 --- a/src/k8s/pkg/k8sd/api/endpoints.go +++ b/src/k8s/pkg/k8sd/api/endpoints.go @@ -2,90 +2,94 @@ package api import ( + "github.com/canonical/microcluster/microcluster" "github.com/canonical/microcluster/rest" ) -var Endpoints = []rest.Endpoint{ - // Cluster status - { - Name: "ClusterStatus", - Path: "k8sd/cluster", - Get: rest.EndpointAction{Handler: getClusterStatus}, - }, - // Clustering - // Unified token endpoint for both, control-plane and worker-node. - { - Name: "Tokens", - Path: "k8sd/cluster/tokens", - Post: rest.EndpointAction{Handler: postTokens}, - }, - { - Name: "JoinNode", - Path: "k8sd/cluster/join", - Post: rest.EndpointAction{Handler: postClusterNode}, - // Joining a node is a bootstrapping action which needs to be available before k8sd is initialized. - AllowedBeforeInit: true, - }, - // Worker nodes - { - Name: "WorkerInfo", - Path: "k8sd/worker/info", - // This endpoint is used by worker nodes that are not part of the microcluster. - // We authenticate by passing a token through an HTTP header instead. - Post: rest.EndpointAction{Handler: postWorkerInfo, AllowUntrusted: true}, - }, - // Kubeconfig - { - Name: "Kubeconfig", - Path: "k8sd/kubeconfig", - Get: rest.EndpointAction{Handler: getKubeconfig}, - }, - // Cluster components - { - Name: "Components", - Path: "k8sd/components", - Get: rest.EndpointAction{Handler: getComponents}, - }, - { - Name: "DNSComponent", - Path: "k8sd/components/dns", - Put: rest.EndpointAction{Handler: putDNSComponent}, - }, - { - Name: "NetworkComponent", - Path: "k8sd/components/network", - Put: rest.EndpointAction{Handler: putNetworkComponent}, - }, - { - Name: "StorageComponent", - Path: "k8sd/components/storage", - Put: rest.EndpointAction{Handler: putStorageComponent}, - }, - { - Name: "IngressComponent", - Path: "k8sd/components/ingress", - Put: rest.EndpointAction{Handler: putIngressComponent}, - }, - { - Name: "GatewayComponent", - Path: "k8sd/components/gateway", - Put: rest.EndpointAction{Handler: putGatewayComponent}, - }, - { - Name: "LoadBalancerComponent", - Path: "k8sd/components/loadbalancer", - Put: rest.EndpointAction{Handler: putLoadBalancerComponent}, - }, - // Kubernetes auth tokens and token review webhook for kube-apiserver - { - Name: "KubernetesAuthTokens", - Path: "kubernetes/auth/tokens", - Get: rest.EndpointAction{Handler: getKubernetesAuthToken, AllowUntrusted: true}, - Post: rest.EndpointAction{Handler: postKubernetesAuthToken}, - }, - { - Name: "KubernetesAuthWebhook", - Path: "kubernetes/auth/webhook", - Post: rest.EndpointAction{Handler: postKubernetesAuthWebhook, AllowUntrusted: true}, - }, +// Endpoints returns the list of endpoints for a given microcluster app. +func Endpoints(app *microcluster.MicroCluster) []rest.Endpoint { + return []rest.Endpoint{ + // Cluster status + { + Name: "ClusterStatus", + Path: "k8sd/cluster", + Get: rest.EndpointAction{Handler: getClusterStatus}, + }, + // Clustering + // Unified token endpoint for both, control-plane and worker-node. + { + Name: "ClusterTokens", + Path: "k8sd/cluster/tokens", + Post: rest.EndpointAction{Handler: wrapHandlerWithMicroCluster(app, postClusterTokens)}, + }, + { + Name: "ClusterJoin", + Path: "k8sd/cluster/join", + Post: rest.EndpointAction{Handler: wrapHandlerWithMicroCluster(app, postClusterJoin)}, + // Joining a node is a bootstrapping action which needs to be available before k8sd is initialized. + AllowedBeforeInit: true, + }, + // Worker nodes + { + Name: "WorkerInfo", + Path: "k8sd/worker/info", + // This endpoint is used by worker nodes that are not part of the microcluster. + // We authenticate by passing a token through an HTTP header instead. + Post: rest.EndpointAction{Handler: postWorkerInfo, AllowUntrusted: true}, + }, + // Kubeconfig + { + Name: "Kubeconfig", + Path: "k8sd/kubeconfig", + Get: rest.EndpointAction{Handler: getKubeconfig}, + }, + // Cluster components + { + Name: "Components", + Path: "k8sd/components", + Get: rest.EndpointAction{Handler: getComponents}, + }, + { + Name: "DNSComponent", + Path: "k8sd/components/dns", + Put: rest.EndpointAction{Handler: putDNSComponent}, + }, + { + Name: "NetworkComponent", + Path: "k8sd/components/network", + Put: rest.EndpointAction{Handler: putNetworkComponent}, + }, + { + Name: "StorageComponent", + Path: "k8sd/components/storage", + Put: rest.EndpointAction{Handler: putStorageComponent}, + }, + { + Name: "IngressComponent", + Path: "k8sd/components/ingress", + Put: rest.EndpointAction{Handler: putIngressComponent}, + }, + { + Name: "GatewayComponent", + Path: "k8sd/components/gateway", + Put: rest.EndpointAction{Handler: putGatewayComponent}, + }, + { + Name: "LoadBalancerComponent", + Path: "k8sd/components/loadbalancer", + Put: rest.EndpointAction{Handler: putLoadBalancerComponent}, + }, + // Kubernetes auth tokens and token review webhook for kube-apiserver + { + Name: "KubernetesAuthTokens", + Path: "kubernetes/auth/tokens", + Get: rest.EndpointAction{Handler: getKubernetesAuthTokens, AllowUntrusted: true}, + Post: rest.EndpointAction{Handler: postKubernetesAuthTokens}, + }, + { + Name: "KubernetesAuthWebhook", + Path: "kubernetes/auth/webhook", + Post: rest.EndpointAction{Handler: postKubernetesAuthWebhook, AllowUntrusted: true}, + }, + } } diff --git a/src/k8s/pkg/k8sd/api/handler.go b/src/k8s/pkg/k8sd/api/handler.go new file mode 100644 index 000000000..2b6d7b00f --- /dev/null +++ b/src/k8s/pkg/k8sd/api/handler.go @@ -0,0 +1,22 @@ +package api + +import ( + "net/http" + + "github.com/canonical/lxd/lxd/response" + "github.com/canonical/microcluster/microcluster" + "github.com/canonical/microcluster/state" +) + +// handler is the handler type for microcluster endpoints. +type handler func(*state.State, *http.Request) response.Response + +// handlerWithMicroCluster is the handler type for endpoints that also need access to the microcluster instance. +type handlerWithMicroCluster func(*microcluster.MicroCluster, *state.State, *http.Request) response.Response + +// wrapHandlerWithMicroCluster creates a microcluster handler from a handlerWithMicroCluster by capturing the microcluster instance. +func wrapHandlerWithMicroCluster(m *microcluster.MicroCluster, handler handlerWithMicroCluster) handler { + return func(s *state.State, r *http.Request) response.Response { + return handler(m, s, r) + } +} diff --git a/src/k8s/pkg/k8sd/api/cluster_config.go b/src/k8s/pkg/k8sd/api/kubeconfig.go similarity index 100% rename from src/k8s/pkg/k8sd/api/cluster_config.go rename to src/k8s/pkg/k8sd/api/kubeconfig.go diff --git a/src/k8s/pkg/k8sd/api/kubernetes_auth_tokens.go b/src/k8s/pkg/k8sd/api/kubernetes_auth_tokens.go index 84d729195..9635b7131 100644 --- a/src/k8s/pkg/k8sd/api/kubernetes_auth_tokens.go +++ b/src/k8s/pkg/k8sd/api/kubernetes_auth_tokens.go @@ -16,7 +16,7 @@ import ( "github.com/canonical/microcluster/state" ) -func getKubernetesAuthToken(state *state.State, r *http.Request) response.Response { +func getKubernetesAuthTokens(state *state.State, r *http.Request) response.Response { token := r.Header.Get("token") var username string @@ -32,7 +32,7 @@ func getKubernetesAuthToken(state *state.State, r *http.Request) response.Respon return response.SyncResponse(true, apiv1.CheckKubernetesAuthTokenResponse{Username: username, Groups: groups}) } -func postKubernetesAuthToken(state *state.State, r *http.Request) response.Response { +func postKubernetesAuthTokens(state *state.State, r *http.Request) response.Response { request := apiv1.CreateKubernetesAuthTokenRequest{} if err := json.NewDecoder(r.Body).Decode(&request); err != nil { return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) diff --git a/src/k8s/pkg/k8sd/app/app.go b/src/k8s/pkg/k8sd/app/app.go index 5fff5445f..bd1c908ff 100644 --- a/src/k8s/pkg/k8sd/app/app.go +++ b/src/k8s/pkg/k8sd/app/app.go @@ -68,7 +68,7 @@ func (a *App) Run(customHooks *config.Hooks) error { hooks.PreRemove = customHooks.PreRemove } } - err := a.MicroCluster.Start(api.Endpoints, database.SchemaExtensions, hooks) + err := a.MicroCluster.Start(api.Endpoints(a.MicroCluster), database.SchemaExtensions, hooks) if err != nil { return fmt.Errorf("failed to run microcluster: %w", err) }