Skip to content

Commit

Permalink
Remove extra bookeeping for worker nodes (#533)
Browse files Browse the repository at this point in the history
* Remove extra bookeeping for worker nodes

* mark nodes with k8sd.io/role label
  • Loading branch information
neoaggelos committed Jul 8, 2024
1 parent 7b684f1 commit 90d6d99
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 288 deletions.
31 changes: 12 additions & 19 deletions src/k8s/pkg/k8sd/api/cluster_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"net/http"

apiv1 "github.com/canonical/k8s/api/v1"
databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/control"
nodeutil "github.com/canonical/k8s/pkg/utils/node"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/cluster"
"github.com/canonical/microcluster/state"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (e *Endpoints) postClusterRemove(s *state.State, r *http.Request) response.Response {
Expand Down Expand Up @@ -65,30 +65,23 @@ func (e *Endpoints) postClusterRemove(s *state.State, r *http.Request) response.
if err := c.DeleteClusterMember(ctx, req.Name, req.Force); err != nil {
return response.InternalError(fmt.Errorf("failed to delete cluster member %s: %w", req.Name, err))
}

return response.SyncResponse(true, nil)
}

isWorker, err := databaseutil.IsWorkerNode(ctx, s, req.Name)
client, err := snap.KubernetesClient("")
if err != nil {
return response.InternalError(fmt.Errorf("failed to check if node is worker: %w", err))
return response.InternalError(fmt.Errorf("failed to create k8s client: %w", err))
}
if isWorker {
// For worker nodes, we need to manually clean up the kubernetes node and db entry.
c, err := snap.KubernetesClient("")
if err != nil {
return response.InternalError(fmt.Errorf("failed to create k8s client: %w", err))
}

if err := c.DeleteNode(ctx, req.Name); err != nil {
return response.InternalError(fmt.Errorf("failed to remove k8s node %q: %w", req.Name, err))
}

if err := databaseutil.DeleteWorkerNodeEntry(ctx, s, req.Name); err != nil {
return response.InternalError(fmt.Errorf("failed to remove worker entry %q: %w", req.Name, err))
}
if node, err := client.CoreV1().Nodes().Get(ctx, req.Name, metav1.GetOptions{}); err != nil {
return NodeUnavailable(fmt.Errorf("node %q is not part of the cluster: %w", req.Name, err))
} else if v, ok := node.Labels["k8sd.io/role"]; !ok || v != "worker" {
return NodeUnavailable(fmt.Errorf("node %q is missing k8sd.io/role=worker label", req.Name))
}

if !isWorker && !isControlPlane {
return NodeUnavailable(fmt.Errorf("node %q is not part of the cluster", req.Name))
if err := client.DeleteNode(ctx, req.Name); err != nil {
return response.InternalError(fmt.Errorf("failed to remove k8s node %q: %w", req.Name, err))
}

return response.SyncResponse(true, nil)
}
6 changes: 0 additions & 6 deletions src/k8s/pkg/k8sd/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ func (e *Endpoints) postWorkerInfo(s *state.State, r *http.Request) response.Res
return response.InternalError(fmt.Errorf("failed to retrieve list of known kube-apiserver endpoints: %w", err))
}

if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
return database.AddWorkerNode(ctx, tx, workerName)
}); err != nil {
return response.InternalError(fmt.Errorf("add worker node transaction failed: %w", err))
}

workerToken := r.Header.Get("worker-token")
if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
return database.DeleteWorkerNodeToken(ctx, tx, workerToken)
Expand Down
1 change: 0 additions & 1 deletion src/k8s/pkg/k8sd/database/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var (
SchemaExtensions = []schema.Update{
schemaApplyMigration("kubernetes-auth-tokens", "000-create.sql"),
schemaApplyMigration("cluster-configs", "000-create.sql"),
schemaApplyMigration("worker-nodes", "000-create.sql"),
schemaApplyMigration("worker-tokens", "000-create.sql"),
}

Expand Down
4 changes: 0 additions & 4 deletions src/k8s/pkg/k8sd/database/sql/queries/worker-nodes/delete.sql

This file was deleted.

4 changes: 0 additions & 4 deletions src/k8s/pkg/k8sd/database/sql/queries/worker-nodes/insert.sql

This file was deleted.

This file was deleted.

6 changes: 0 additions & 6 deletions src/k8s/pkg/k8sd/database/sql/queries/worker-nodes/select.sql

This file was deleted.

53 changes: 0 additions & 53 deletions src/k8s/pkg/k8sd/database/util/node.go

This file was deleted.

69 changes: 0 additions & 69 deletions src/k8s/pkg/k8sd/database/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (

var (
workerStmts = map[string]int{
"insert-node": MustPrepareStatement("worker-nodes", "insert.sql"),
"select-node": MustPrepareStatement("worker-nodes", "select.sql"),
"select-by-name": MustPrepareStatement("worker-nodes", "select-by-name.sql"),
"delete-node": MustPrepareStatement("worker-nodes", "delete.sql"),

"insert-token": MustPrepareStatement("worker-tokens", "insert.sql"),
"select-token": MustPrepareStatement("worker-tokens", "select.sql"),
"delete-token": MustPrepareStatement("worker-tokens", "delete-by-token.sql"),
Expand Down Expand Up @@ -68,67 +63,3 @@ func DeleteWorkerNodeToken(ctx context.Context, tx *sql.Tx, nodeName string) err
}
return nil
}

// AddWorkerNode adds a new worker node entry on the database.
func AddWorkerNode(ctx context.Context, tx *sql.Tx, name string) error {
insertTxStmt, err := cluster.Stmt(tx, workerStmts["insert-node"])
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
if _, err := insertTxStmt.ExecContext(ctx, name); err != nil {
return fmt.Errorf("insert worker node query failed: %w", err)
}
return nil
}

// CheckWorkerExists returns true if a worker node entry for this name exists.
func CheckWorkerExists(ctx context.Context, tx *sql.Tx, name string) (exists bool, err error) {
selectTxStmt, err := cluster.Stmt(tx, workerStmts["select-by-name"])
if err != nil {
return false, fmt.Errorf("failed to prepare select statement: %w", err)
}

row := selectTxStmt.QueryRowContext(ctx, name)
if err := row.Scan(new(string)); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, fmt.Errorf("select worker node %q query failed: %w", name, err)
}

return true, nil
}

// ListWorkerNodes lists the known worker nodes on the database.
func ListWorkerNodes(ctx context.Context, tx *sql.Tx) ([]string, error) {
selectTxStmt, err := cluster.Stmt(tx, workerStmts["select-node"])
if err != nil {
return nil, fmt.Errorf("failed to prepare select statement: %w", err)
}
rows, err := selectTxStmt.QueryContext(ctx)
if err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("select worker nodes query failed: %w", err)
}
var nodes []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("failed to parse row: %w", err)
}
nodes = append(nodes, name)
}

return nodes, nil
}

// DeleteWorkerNode deletes a worker node from the database.
func DeleteWorkerNode(ctx context.Context, tx *sql.Tx, name string) error {
deleteTxStmt, err := cluster.Stmt(tx, workerStmts["delete-node"])
if err != nil {
return fmt.Errorf("failed to prepare delete statement: %w", err)
}
if _, err := deleteTxStmt.ExecContext(ctx, name); err != nil {
return fmt.Errorf("insert worker node query failed: %w", err)
}
return nil
}
100 changes: 0 additions & 100 deletions src/k8s/pkg/k8sd/database/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,103 +72,3 @@ func TestWorkerNodeToken(t *testing.T) {
})
})
}

func TestWorkerNodes(t *testing.T) {
WithDB(t, func(ctx context.Context, db DB) {
g := NewWithT(t)
err := db.Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
t.Run("Empty", func(t *testing.T) {
g := NewWithT(t)

nodes, err := database.ListWorkerNodes(ctx, tx)
g.Expect(err).To(BeNil())
g.Expect(nodes).To(BeEmpty())
})

t.Run("AddOne", func(t *testing.T) {
g := NewWithT(t)

err := database.AddWorkerNode(ctx, tx, "w1")
g.Expect(err).To(BeNil())

nodes, err := database.ListWorkerNodes(ctx, tx)
g.Expect(err).To(BeNil())
g.Expect(nodes).To(ConsistOf("w1"))

exists, err := database.CheckWorkerExists(ctx, tx, "w1")
g.Expect(err).To(BeNil())
g.Expect(exists).To(BeTrue())
})

t.Run("AddTwo", func(t *testing.T) {
g := NewWithT(t)

err := database.AddWorkerNode(ctx, tx, "w2")
g.Expect(err).To(BeNil())

nodes, err := database.ListWorkerNodes(ctx, tx)
g.Expect(err).To(BeNil())
g.Expect(nodes).To(ConsistOf("w1", "w2"))

exists, err := database.CheckWorkerExists(ctx, tx, "w1")
g.Expect(err).To(BeNil())
g.Expect(exists).To(BeTrue())

exists, err = database.CheckWorkerExists(ctx, tx, "w2")
g.Expect(err).To(BeNil())
g.Expect(exists).To(BeTrue())
})

t.Run("AddDuplicateFails", func(t *testing.T) {
g := NewWithT(t)

err := database.AddWorkerNode(ctx, tx, "w1")
g.Expect(err).To(HaveOccurred())

nodes, err := database.ListWorkerNodes(ctx, tx)
g.Expect(err).To(BeNil())
g.Expect(nodes).To(ConsistOf("w1", "w2"))
})

t.Run("Delete", func(t *testing.T) {
g := NewWithT(t)

err := database.DeleteWorkerNode(ctx, tx, "w1")
g.Expect(err).To(BeNil())

nodes, err := database.ListWorkerNodes(ctx, tx)
g.Expect(err).To(BeNil())
g.Expect(nodes).To(ConsistOf("w2"))

exists, err := database.CheckWorkerExists(ctx, tx, "w1")
g.Expect(err).To(BeNil())
g.Expect(exists).To(BeFalse())

exists, err = database.CheckWorkerExists(ctx, tx, "w2")
g.Expect(err).To(BeNil())
g.Expect(exists).To(BeTrue())
})

t.Run("ReuseName", func(t *testing.T) {
g := NewWithT(t)

err := database.AddWorkerNode(ctx, tx, "w1")
g.Expect(err).To(BeNil())

nodes, err := database.ListWorkerNodes(ctx, tx)
g.Expect(err).To(BeNil())
g.Expect(nodes).To(ConsistOf("w1", "w2"))

exists, err := database.CheckWorkerExists(ctx, tx, "w1")
g.Expect(err).To(BeNil())
g.Expect(exists).To(BeTrue())

exists, err = database.CheckWorkerExists(ctx, tx, "w2")
g.Expect(err).To(BeNil())
g.Expect(exists).To(BeTrue())
})
return nil
})
g.Expect(err).To(BeNil())
})
}
39 changes: 22 additions & 17 deletions src/k8s/pkg/k8sd/setup/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,33 @@ import (
"github.com/canonical/k8s/pkg/utils"
)

var kubeletTLSCipherSuites = []string{
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
"TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
"TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
"TLS_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_256_GCM_SHA384",
}
var (
kubeletTLSCipherSuites = []string{
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
"TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
"TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
"TLS_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_256_GCM_SHA384",
}

var kubeletControlPlaneLabels = []string{
"node-role.kubernetes.io/control-plane=",
}
kubeletControlPlaneLabels = []string{
"node-role.kubernetes.io/control-plane=", // mark node with role "control-plane"
"node-role.kubernetes.io/worker=", // mark node with role "worker"
"k8sd.io/role=control-plane", // mark as k8sd control plane node
}

var kubeletWorkerLabels = []string{
"node-role.kubernetes.io/worker=",
}
kubeletWorkerLabels = []string{
"node-role.kubernetes.io/worker=", // mark node with role "worker"
"k8sd.io/role=worker", // mark as k8sd worker node
}
)

// KubeletControlPlane configures kubelet on a control plane node.
func KubeletControlPlane(snap snap.Snap, hostname string, nodeIP net.IP, clusterDNS string, clusterDomain string, cloudProvider string, registerWithTaints []string, extraArgs map[string]*string) error {
return kubelet(snap, hostname, nodeIP, clusterDNS, clusterDomain, cloudProvider, registerWithTaints, append(kubeletControlPlaneLabels, kubeletWorkerLabels...), extraArgs)
return kubelet(snap, hostname, nodeIP, clusterDNS, clusterDomain, cloudProvider, registerWithTaints, kubeletControlPlaneLabels, extraArgs)
}

// KubeletWorker configures kubelet on a worker node.
Expand Down
Loading

0 comments on commit 90d6d99

Please sign in to comment.