Skip to content

Commit

Permalink
fix: bugfix for status, statusaggregator and auto-migration
Browse files Browse the repository at this point in the history
  • Loading branch information
JackZxj committed Aug 8, 2023
1 parent 558adf3 commit 17e5245
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 51 deletions.
21 changes: 3 additions & 18 deletions config/sample/extra/pod-ftc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,18 @@ kind: FederatedTypeConfig
metadata:
name: pods
spec:
federatedType:
group: types.kubeadmiral.io
kind: FederatedPod
pluralName: federatedpods
scope: Namespaced
version: v1alpha1
targetType:
kind: Pod
pluralName: pods
scope: Namespaced
version: v1
sourceType:
kind: Pod
pluralName: pods
scope: Namespaced
version: v1
statusType:
group: types.kubeadmiral.io
kind: FederatedPodStatus
pluralName: federatedpodstatuses
scope: Namespaced
version: v1alpha1
statusAggregation: Enabled
statusAggregation:
enabled: true
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
statusCollection:
enabled: true
fields:
- metadata.creationTimestamp
- spec.nodeName
Expand Down
6 changes: 0 additions & 6 deletions config/sample/host/01-ftc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ spec:
version: v1
statusAggregation:
enabled: true
revisionHistory:
enabled: true
autoMigration:
enabled: true
controllers:
Expand Down Expand Up @@ -258,8 +256,6 @@ spec:
pluralName: statefulsets
scope: Namespaced
version: v1
revisionHistory:
enabled: true
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
Expand All @@ -281,8 +277,6 @@ spec:
pluralName: daemonsets
scope: Namespaced
version: v1
revisionHistory:
enabled: true
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
Expand Down
22 changes: 16 additions & 6 deletions pkg/controllers/automigration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,18 @@ func (c *Controller) getPodsFromCluster(
return nil, false, fmt.Errorf("failed to get plugin for FTC: %w", err)
}

client, exist := c.federatedInformer.GetClusterDynamicClient(clusterName)
dynamicClient, exist := c.federatedInformer.GetClusterDynamicClient(clusterName)
if !exist {
return nil, true, fmt.Errorf("failed to get client for cluster: %w", err)
return nil, true, fmt.Errorf("failed to get dynamic client for cluster %s", clusterName)
}
kubeClient, exist := c.federatedInformer.GetClusterKubeClient(clusterName)
if !exist {
return nil, true, fmt.Errorf("failed to get kube client for cluster: %s", clusterName)
}

pods, err := plugin.GetPodsForClusterObject(ctx, unsClusterObj, plugins.ClusterHandle{
Client: client,
DynamicClient: dynamicClient,
KubeClient: kubeClient,
})
if err != nil {
return nil, true, fmt.Errorf("failed to get pods for federated object: %w", err)
Expand All @@ -515,9 +520,13 @@ func (c *Controller) getPossibleSourceObjectsFromCluster(
pod *corev1.Pod,
clusterName string,
) (possibleQualifies []common.QualifiedName, err error) {
client, exist := c.federatedInformer.GetClusterDynamicClient(clusterName)
dynamicClient, exist := c.federatedInformer.GetClusterDynamicClient(clusterName)
if !exist {
return nil, fmt.Errorf("failed to get dynamic client for cluster %s", clusterName)
}
kubeClient, exist := c.federatedInformer.GetClusterKubeClient(clusterName)
if !exist {
return nil, fmt.Errorf("failed to get client for cluster %s", clusterName)
return nil, fmt.Errorf("failed to get kube client for cluster %s", clusterName)
}

for gvk, plugin := range plugins.NativePlugins {
Expand All @@ -527,7 +536,8 @@ func (c *Controller) getPossibleSourceObjectsFromCluster(
continue
}
object, found, err := plugin.GetTargetObjectFromPod(ctx, pod.DeepCopy(), plugins.ClusterHandle{
Client: client,
DynamicClient: dynamicClient,
KubeClient: kubeClient,
})
if err != nil || !found {
logger.V(3).Info(
Expand Down
22 changes: 4 additions & 18 deletions pkg/controllers/automigration/plugins/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,12 @@ func (*deploymentPlugin) GetPodsForClusterObject(
}

rsList, err := deploymentutil.ListReplicaSets(deployment, func(ns string, opts metav1.ListOptions) ([]*appsv1.ReplicaSet, error) {
rsList := &appsv1.ReplicaSetList{}
opts = *opts.DeepCopy()
opts.ResourceVersion = "0" // list from watch cache
unsRsList, err := handle.Client.
Resource(common.ReplicaSetGVR).
Namespace(ns).
List(ctx, opts)
rsList, err := handle.KubeClient.AppsV1().ReplicaSets(ns).List(ctx, opts)
if err != nil {
return nil, err
}
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unsRsList.Object, rsList); err != nil {
return nil, err
}
ret := []*appsv1.ReplicaSet{}
for i := range rsList.Items {
ret = append(ret, &rsList.Items[i])
Expand All @@ -74,22 +67,15 @@ func (*deploymentPlugin) GetPodsForClusterObject(
deployment,
[]*appsv1.ReplicaSet{newRS},
func(ns string, opts metav1.ListOptions) (*corev1.PodList, error) {
podList := &corev1.PodList{}
opts = *opts.DeepCopy()
opts.ResourceVersion = "0" // list from watch cache
if err != nil {
return nil, err
}
unsPodList, err := handle.Client.
Resource(common.PodGVR).
Namespace(ns).
List(ctx, opts)
podList, err := handle.KubeClient.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
return nil, err
}
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unsPodList.Object, podList); err != nil {
return nil, err
}
return podList, nil
},
)
Expand All @@ -110,7 +96,7 @@ func (*deploymentPlugin) GetTargetObjectFromPod(
pod *corev1.Pod,
handle ClusterHandle,
) (obj *unstructured.Unstructured, found bool, err error) {
rs, found, err := GetSpecifiedOwnerFromObj(ctx, handle.Client, pod, metav1.APIResource{
rs, found, err := GetSpecifiedOwnerFromObj(ctx, handle.DynamicClient, pod, metav1.APIResource{
Name: "replicasets",
Group: appsv1.GroupName,
Version: "v1",
Expand All @@ -120,7 +106,7 @@ func (*deploymentPlugin) GetTargetObjectFromPod(
return nil, false, err
}

return GetSpecifiedOwnerFromObj(ctx, handle.Client, rs, metav1.APIResource{
return GetSpecifiedOwnerFromObj(ctx, handle.DynamicClient, rs, metav1.APIResource{
Name: "deployments",
Group: appsv1.GroupName,
Version: "v1",
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/automigration/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
)

type ClusterHandle struct {
Client dynamic.Interface
DynamicClient dynamic.Interface
KubeClient kubernetes.Interface
}

type Plugin interface {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ func NewStatusController(
if lastApplied == nil || latest == nil {
return true
}
return lastApplied.IsStatusCollectionEnabled() != latest.IsStatusCollectionEnabled()
return lastApplied.IsStatusCollectionEnabled() != latest.IsStatusCollectionEnabled() ||
(lastApplied.Spec.StatusCollection != nil && latest.Spec.StatusCollection != nil &&
!reflect.DeepEqual(lastApplied.Spec.StatusCollection.Fields, latest.Spec.StatusCollection.Fields))
},
Generator: func(ftc *fedcorev1a1.FederatedTypeConfig) cache.ResourceEventHandler {
if !ftc.IsStatusCollectionEnabled() {
Expand Down
11 changes: 10 additions & 1 deletion pkg/controllers/statusaggregator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned"
fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federate"
"github.com/kubewharf/kubeadmiral/pkg/controllers/statusaggregator/plugins"
"github.com/kubewharf/kubeadmiral/pkg/stats"
clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster"
Expand Down Expand Up @@ -120,7 +121,7 @@ func NewStatusAggregatorController(
clusterQueue: workqueue.NewNamedDelayingQueue(StatusAggregatorControllerName),
clusterAvailableDelay: clusterAvailableDelay,
clusterUnavailableDelay: clusterUnavailableDelay,
objectEnqueueDelay: 10 * time.Second,
objectEnqueueDelay: 3 * time.Second,

eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, StatusAggregatorControllerName, 4),
metrics: metrics,
Expand Down Expand Up @@ -219,6 +220,14 @@ func NewStatusAggregatorController(
if !obj.GetDeletionTimestamp().IsZero() || !ftc.GetStatusAggregationEnabled() {
return
}
if anno := obj.GetAnnotations(); anno != nil {
if noFederatedResource, ok := anno[federate.NoFederatedResource]; ok {
if len(noFederatedResource) > 0 {
logger.V(3).Info("No-federated-resource annotation found, skip status aggregation")
return
}
}
}
a.worker.Enqueue(reconcileKey{
gvk: ftc.GetSourceTypeGVK(),
namespace: obj.GetNamespace(),
Expand Down

0 comments on commit 17e5245

Please sign in to comment.