Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bschimke95 committed Apr 16, 2024
1 parent 47f7b0d commit 51a3fad
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 49 deletions.
9 changes: 2 additions & 7 deletions src/k8s/pkg/k8sd/api/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ func (e *Endpoints) putClusterConfig(s *state.State, r *http.Request) response.R
return response.InternalError(fmt.Errorf("database transaction to update cluster configuration failed: %w", err))
}

// Trigger an update of the configuration.
// Do not wait if the channel is full. The reconcilation loop will apply the most recent changes
select {
case e.provider.UpdateNodeConfigurationControllerCh() <- struct{}{}:
default:
}

if !requestedConfig.Network.Empty() {
if err := component.ReconcileNetworkComponent(r.Context(), snap, oldConfig.Network.Enabled, requestedConfig.Network.Enabled, mergedConfig); err != nil {
return response.InternalError(fmt.Errorf("failed to reconcile network: %w", err))
Expand Down Expand Up @@ -119,6 +112,8 @@ func (e *Endpoints) putClusterConfig(s *state.State, r *http.Request) response.R
}
}

e.provider.NotifyUpdateConfigMap()

return response.SyncResponse(true, &api.UpdateClusterConfigResponse{})
}

Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
type Provider interface {
MicroCluster() *microcluster.MicroCluster
Snap() snap.Snap
UpdateNodeConfigurationControllerCh() chan<- struct{}
NotifyUpdateConfigMap()
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func New(ctx context.Context, cfg Config) (*App, error) {
cfg.Snap,
app.readyWg.Wait,
func() (*k8s.Client, error) {
return k8s.NewClient(cfg.Snap.KubernetesNodeRESTClientGetter("kube-system"))
return k8s.NewClient(cfg.Snap.KubernetesNodeRESTClientGetter(""))
},
)

Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot
// Trigger an update of the configuration.
// Do not wait if the channel is full. The reconcilation loop will apply the most recent changes
select {
case a.updateNodeConfigController.UpdateCh <- struct{}{}:
case a.updateNodeConfigController.TriggerCh <- struct{}{}:
default:
}

Expand Down
7 changes: 7 additions & 0 deletions src/k8s/pkg/k8sd/app/hooks_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,12 @@ func (a *App) onStart(s *state.State) error {
})
}

// start update node config controller
if a.updateNodeConfigController != nil {
go a.updateNodeConfigController.Run(s.Context, func(ctx context.Context) (types.ClusterConfig, error) {
return utils.GetClusterConfig(ctx, s)
})
}

return nil
}
4 changes: 2 additions & 2 deletions src/k8s/pkg/k8sd/app/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func (a *App) Snap() snap.Snap {
return a.snap
}

func (a *App) UpdateNodeConfigurationControllerCh() chan<- struct{} {
return a.updateNodeConfigController.UpdateCh
func (a *App) NotifyUpdateConfigMap() {
a.updateNodeConfigController.TriggerCh <- struct{}{}
}

// Ensure App implements api.Provider
Expand Down
15 changes: 6 additions & 9 deletions src/k8s/pkg/k8sd/controllers/update_node_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
)

// UpdateNodeConfigurationController asynchronously performs updates of the cluster config.
// An updates is triggered by sending the patch to the updateCh.
// An updates is triggered by sending to the TriggerCh.
type UpdateNodeConfigurationController struct {
snap snap.Snap
waitReady func()
newK8sClient func() (*k8s.Client, error)
// UpdateCh is used to trigger config updates.
UpdateCh chan struct{}
// TriggerCh is used to trigger config updates.
TriggerCh chan struct{}
// ReconciledCh is used to indicate that a reconcilation loop has finished.
ReconciledCh chan struct{}
}
Expand All @@ -30,7 +30,7 @@ func NewUpdateNodeConfigurationController(snap snap.Snap, waitReady func(), newK
snap: snap,
waitReady: waitReady,
newK8sClient: newK8sClient,
UpdateCh: make(chan struct{}, 1),
TriggerCh: make(chan struct{}, 1),
ReconciledCh: make(chan struct{}, 1),
}
}
Expand All @@ -53,7 +53,7 @@ func (c *UpdateNodeConfigurationController) retryNewK8sClient(ctx context.Contex
// Run starts the controller.
// Run accepts a context to manage the lifecycle of the controller.
// Run accepts a function that retrieves the current cluster configuration.
// Run will loop everytime a cluster config patch is sent to the UpdateCh.
// Run will loop everytime the TriggerCh is triggered.
func (c *UpdateNodeConfigurationController) Run(ctx context.Context, getClusterConfig func(context.Context) (types.ClusterConfig, error)) {
c.waitReady()

Expand All @@ -66,7 +66,7 @@ func (c *UpdateNodeConfigurationController) Run(ctx context.Context, getClusterC
select {
case <-ctx.Done():
return
case <-c.UpdateCh:
case <-c.TriggerCh:
}

if isWorker, err := snaputil.IsWorker(c.snap); err != nil {
Expand All @@ -89,9 +89,6 @@ func (c *UpdateNodeConfigurationController) Run(ctx context.Context, getClusterC

select {
case c.ReconciledCh <- struct{}{}:
case <-time.After(10 * time.Second):
log.Println("failed to reconcile cluster config in time - will fetch latest config and retry.")
continue
default:
}
}
Expand Down
57 changes: 29 additions & 28 deletions src/k8s/pkg/k8sd/controllers/update_node_configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,39 @@ import (
"time"

"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/snap/mock"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/k8s/pkg/utils/vals"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)

func TestUpdateNodeConfigurationController(t *testing.T) {
testCases := []struct {
name string
initialConfig map[string]string
expectedConfig map[string]string
initialConfig types.ClusterConfig
expectedConfig types.ClusterConfig
expectedFailure bool
}{
{
name: "ControlPlane_DefaultConfig",
initialConfig: map[string]string{
"test": "data",
},
expectedConfig: map[string]string{
"ClusterDNS": "cluster.local",
name: "ControlPlane_DefaultConfig",
initialConfig: types.ClusterConfig{},
expectedConfig: types.ClusterConfig{
Kubelet: types.Kubelet{
ClusterDomain: vals.Pointer("cluster.local"),
},
},
expectedFailure: false,
},
{
name: "ControlPlane_EmptyConfig",
initialConfig: map[string]string{
"test": "data",
},
expectedConfig: nil, // Expecting empty ConfigMap data
name: "ControlPlane_EmptyConfig",
initialConfig: types.ClusterConfig{},
expectedConfig: types.ClusterConfig{},
expectedFailure: true,
},
}
Expand All @@ -65,40 +63,43 @@ func TestUpdateNodeConfigurationController(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

configProvider := &configProvider{}
configProvider := &configProvider{config: tc.expectedConfig}
kubeletConfigMap, err := tc.initialConfig.Kubelet.ToConfigMap()
g.Expect(err).ToNot(HaveOccurred())

configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "k8sd-config",
Namespace: "kube-system",
},
Data: tc.initialConfig,
Data: kubeletConfigMap,
}
clientset := fake.NewSimpleClientset(configMap)

if !tc.expectedFailure {
clientset.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
configMap.Data = tc.expectedConfig
return true, nil, nil
})
}

ctrl := controllers.NewUpdateNodeConfigurationController(s, func() {}, func() (*k8s.Client, error) {
return &k8s.Client{Interface: clientset}, nil
})
go ctrl.Run(ctx, configProvider.getConfig)

select {
case ctrl.UpdateCh <- struct{}{}:
case ctrl.TriggerCh <- struct{}{}:
case <-time.After(channelSendTimeout):
g.Fail("Timed out while attempting to trigger controller reconcile loop")
}
<-ctrl.ReconciledCh

select {
case <-ctrl.ReconciledCh:
case <-time.After(channelSendTimeout):
}

result, err := clientset.CoreV1().ConfigMaps("kube-system").Get(ctx, "k8sd-config", metav1.GetOptions{})
g.Expect(err).ToNot(HaveOccurred())
expectedConfigMap, err := tc.expectedConfig.Kubelet.ToConfigMap()
g.Expect(err).ToNot(HaveOccurred())
if tc.expectedFailure {
g.Expect(configMap.Data).ToNot(Equal(tc.expectedConfig))
g.Expect(result.Data).ToNot(Equal(expectedConfigMap))
} else {
g.Expect(configMap.Data).To(Equal(tc.expectedConfig))
g.Expect(result.Data).To(Equal(expectedConfigMap))
}
})
}
Expand Down

0 comments on commit 51a3fad

Please sign in to comment.