Skip to content

Commit

Permalink
Allow access to the microcluster app from rest endpoints (canonical#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
neoaggelos authored Feb 8, 2024
1 parent a787f52 commit a5831b3
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 175 deletions.
8 changes: 4 additions & 4 deletions src/k8s/api/v1/cluster_node.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package v1

// JoinNodeRequest is used to request to add a node to the cluster.
type JoinNodeRequest struct {
// JoinClusterRequest is used to request to add a node to the cluster.
type JoinClusterRequest struct {
Name string `json:"name"`
Address string `json:"address"`
Token string `json:"token"`
}

// JoinNodeResponse is the response from "POST 1.0/k8sd/cluster/{node}"
type JoinNodeResponse struct{}
// JoinClusterResponse is the response from "POST 1.0/k8sd/cluster/{node}"
type JoinClusterResponse struct{}
2 changes: 1 addition & 1 deletion src/k8s/cmd/k8s/k8s_join_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
timeoutCtx, cancel := context.WithTimeout(cmd.Context(), joinNodeCmdOpts.timeout)
defer cancel()

err = c.JoinNode(timeoutCtx, joinNodeCmdOpts.name, joinNodeCmdOpts.address, token)
err = c.JoinCluster(timeoutCtx, joinNodeCmdOpts.name, joinNodeCmdOpts.address, token)
if err != nil {
return fmt.Errorf("failed to join cluster: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions src/k8s/pkg/k8s/client/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import (
"github.com/canonical/lxd/shared/api"
)

func (c *Client) JoinNode(ctx context.Context, name string, address string, token string) error {
func (c *Client) JoinCluster(ctx context.Context, name string, address string, token string) error {
if err := c.m.Ready(30); err != nil {
return fmt.Errorf("cluster did not come up in time: %w", err)
}

request := apiv1.JoinNodeRequest{
request := apiv1.JoinClusterRequest{
Name: name,
Address: address,
Token: token,
}
var response apiv1.JoinNodeResponse
var response apiv1.JoinClusterResponse
err := c.mc.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster", "join"), request, &response)
if err != nil {
fmt.Fprintln(os.Stderr, "failed to join node - cleaning up now")
Expand Down
36 changes: 36 additions & 0 deletions src/k8s/pkg/k8sd/api/cluster_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package api

import (
"encoding/json"
"fmt"
"net/http"
"time"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/state"
)

func postClusterJoin(m *microcluster.MicroCluster, s *state.State, r *http.Request) response.Response {
req := apiv1.JoinClusterRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return response.BadRequest(fmt.Errorf("failed to parse request: %w", err))
}

// differentiate between control plane and worker node tokens
info := &types.InternalWorkerNodeToken{}
if info.Decode(req.Token) == nil {
// valid worker node token
if err := m.NewCluster(req.Name, req.Address, map[string]string{"workerToken": req.Token}, time.Second*180); err != nil {
return response.InternalError(fmt.Errorf("failed to join k8sd cluster as worker: %w", err))
}
} else {
if err := m.JoinCluster(req.Name, req.Address, req.Token, nil, time.Second*180); err != nil {
return response.InternalError(fmt.Errorf("failed to join k8sd cluster as control plane: %w", err))
}
}

return response.SyncResponse(true, &apiv1.JoinClusterResponse{})
}
56 changes: 0 additions & 56 deletions src/k8s/pkg/k8sd/api/cluster_node.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@ import (
"github.com/canonical/microcluster/state"
)

func postTokens(s *state.State, r *http.Request) response.Response {
func postClusterTokens(m *microcluster.MicroCluster, s *state.State, r *http.Request) response.Response {
req := apiv1.TokenRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return response.BadRequest(fmt.Errorf("failed to parse request: %w", err))
}

var token string
var err error
var (
token string
err error
)
if req.Worker {
token, err = createWorkerToken(s, r)
token, err = createWorkerToken(s)
} else {
token, err = createControlPlaneToken(s, r, req.Name)
token, err = m.NewJoinToken(req.Name)
}

if err != nil {
return response.SmartError(fmt.Errorf("failed to create token: %w", err))
return response.InternalError(fmt.Errorf("failed to create token: %w", err))
}

return response.SyncResponse(true, &apiv1.TokensResponse{EncodedToken: token})

}

func createWorkerToken(s *state.State, r *http.Request) (string, error) {
func createWorkerToken(s *state.State) (string, error) {
var token string
if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
var err error
Expand Down Expand Up @@ -67,19 +67,3 @@ func createWorkerToken(s *state.State, r *http.Request) (string, error) {

return token, nil
}

func createControlPlaneToken(s *state.State, r *http.Request, name string) (string, error) {
m, err := microcluster.App(r.Context(), microcluster.Args{
StateDir: s.OS.StateDir,
})
if err != nil {
return "", fmt.Errorf("failed to get microcluster app: %w", err)
}

c, err := m.LocalClient()
if err != nil {
return "", fmt.Errorf("failed to get local microcluster client: %w", err)
}

return c.RequestToken(r.Context(), name)
}
170 changes: 87 additions & 83 deletions src/k8s/pkg/k8sd/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,90 +2,94 @@
package api

import (
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/rest"
)

var Endpoints = []rest.Endpoint{
// Cluster status
{
Name: "ClusterStatus",
Path: "k8sd/cluster",
Get: rest.EndpointAction{Handler: getClusterStatus},
},
// Clustering
// Unified token endpoint for both, control-plane and worker-node.
{
Name: "Tokens",
Path: "k8sd/cluster/tokens",
Post: rest.EndpointAction{Handler: postTokens},
},
{
Name: "JoinNode",
Path: "k8sd/cluster/join",
Post: rest.EndpointAction{Handler: postClusterNode},
// Joining a node is a bootstrapping action which needs to be available before k8sd is initialized.
AllowedBeforeInit: true,
},
// Worker nodes
{
Name: "WorkerInfo",
Path: "k8sd/worker/info",
// This endpoint is used by worker nodes that are not part of the microcluster.
// We authenticate by passing a token through an HTTP header instead.
Post: rest.EndpointAction{Handler: postWorkerInfo, AllowUntrusted: true},
},
// Kubeconfig
{
Name: "Kubeconfig",
Path: "k8sd/kubeconfig",
Get: rest.EndpointAction{Handler: getKubeconfig},
},
// Cluster components
{
Name: "Components",
Path: "k8sd/components",
Get: rest.EndpointAction{Handler: getComponents},
},
{
Name: "DNSComponent",
Path: "k8sd/components/dns",
Put: rest.EndpointAction{Handler: putDNSComponent},
},
{
Name: "NetworkComponent",
Path: "k8sd/components/network",
Put: rest.EndpointAction{Handler: putNetworkComponent},
},
{
Name: "StorageComponent",
Path: "k8sd/components/storage",
Put: rest.EndpointAction{Handler: putStorageComponent},
},
{
Name: "IngressComponent",
Path: "k8sd/components/ingress",
Put: rest.EndpointAction{Handler: putIngressComponent},
},
{
Name: "GatewayComponent",
Path: "k8sd/components/gateway",
Put: rest.EndpointAction{Handler: putGatewayComponent},
},
{
Name: "LoadBalancerComponent",
Path: "k8sd/components/loadbalancer",
Put: rest.EndpointAction{Handler: putLoadBalancerComponent},
},
// Kubernetes auth tokens and token review webhook for kube-apiserver
{
Name: "KubernetesAuthTokens",
Path: "kubernetes/auth/tokens",
Get: rest.EndpointAction{Handler: getKubernetesAuthToken, AllowUntrusted: true},
Post: rest.EndpointAction{Handler: postKubernetesAuthToken},
},
{
Name: "KubernetesAuthWebhook",
Path: "kubernetes/auth/webhook",
Post: rest.EndpointAction{Handler: postKubernetesAuthWebhook, AllowUntrusted: true},
},
// Endpoints returns the list of endpoints for a given microcluster app.
func Endpoints(app *microcluster.MicroCluster) []rest.Endpoint {
return []rest.Endpoint{
// Cluster status
{
Name: "ClusterStatus",
Path: "k8sd/cluster",
Get: rest.EndpointAction{Handler: getClusterStatus},
},
// Clustering
// Unified token endpoint for both, control-plane and worker-node.
{
Name: "ClusterTokens",
Path: "k8sd/cluster/tokens",
Post: rest.EndpointAction{Handler: wrapHandlerWithMicroCluster(app, postClusterTokens)},
},
{
Name: "ClusterJoin",
Path: "k8sd/cluster/join",
Post: rest.EndpointAction{Handler: wrapHandlerWithMicroCluster(app, postClusterJoin)},
// Joining a node is a bootstrapping action which needs to be available before k8sd is initialized.
AllowedBeforeInit: true,
},
// Worker nodes
{
Name: "WorkerInfo",
Path: "k8sd/worker/info",
// This endpoint is used by worker nodes that are not part of the microcluster.
// We authenticate by passing a token through an HTTP header instead.
Post: rest.EndpointAction{Handler: postWorkerInfo, AllowUntrusted: true},
},
// Kubeconfig
{
Name: "Kubeconfig",
Path: "k8sd/kubeconfig",
Get: rest.EndpointAction{Handler: getKubeconfig},
},
// Cluster components
{
Name: "Components",
Path: "k8sd/components",
Get: rest.EndpointAction{Handler: getComponents},
},
{
Name: "DNSComponent",
Path: "k8sd/components/dns",
Put: rest.EndpointAction{Handler: putDNSComponent},
},
{
Name: "NetworkComponent",
Path: "k8sd/components/network",
Put: rest.EndpointAction{Handler: putNetworkComponent},
},
{
Name: "StorageComponent",
Path: "k8sd/components/storage",
Put: rest.EndpointAction{Handler: putStorageComponent},
},
{
Name: "IngressComponent",
Path: "k8sd/components/ingress",
Put: rest.EndpointAction{Handler: putIngressComponent},
},
{
Name: "GatewayComponent",
Path: "k8sd/components/gateway",
Put: rest.EndpointAction{Handler: putGatewayComponent},
},
{
Name: "LoadBalancerComponent",
Path: "k8sd/components/loadbalancer",
Put: rest.EndpointAction{Handler: putLoadBalancerComponent},
},
// Kubernetes auth tokens and token review webhook for kube-apiserver
{
Name: "KubernetesAuthTokens",
Path: "kubernetes/auth/tokens",
Get: rest.EndpointAction{Handler: getKubernetesAuthTokens, AllowUntrusted: true},
Post: rest.EndpointAction{Handler: postKubernetesAuthTokens},
},
{
Name: "KubernetesAuthWebhook",
Path: "kubernetes/auth/webhook",
Post: rest.EndpointAction{Handler: postKubernetesAuthWebhook, AllowUntrusted: true},
},
}
}
22 changes: 22 additions & 0 deletions src/k8s/pkg/k8sd/api/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package api

import (
"net/http"

"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/state"
)

// handler is the handler type for microcluster endpoints.
type handler func(*state.State, *http.Request) response.Response

// handlerWithMicroCluster is the handler type for endpoints that also need access to the microcluster instance.
type handlerWithMicroCluster func(*microcluster.MicroCluster, *state.State, *http.Request) response.Response

// wrapHandlerWithMicroCluster creates a microcluster handler from a handlerWithMicroCluster by capturing the microcluster instance.
func wrapHandlerWithMicroCluster(m *microcluster.MicroCluster, handler handlerWithMicroCluster) handler {
return func(s *state.State, r *http.Request) response.Response {
return handler(m, s, r)
}
}
File renamed without changes.
Loading

0 comments on commit a5831b3

Please sign in to comment.