Skip to content

Commit

Permalink
feat: add support for katalyst
Browse files Browse the repository at this point in the history
  • Loading branch information
JackZxj committed Jan 16, 2024
1 parent 90e7d5e commit 85e709c
Show file tree
Hide file tree
Showing 29 changed files with 1,302 additions and 35 deletions.
4 changes: 2 additions & 2 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,9 @@ func startFederatedClusterController(
controllerCtx.FederatedInformerManager,
controllerCtx.Metrics,
klog.Background(),
controllerCtx.ComponentConfig.ClusterJoinTimeout,
controllerCtx.WorkerCount,
controllerCtx.FedSystemNamespace,
controllerCtx.ComponentConfig.ResourceAggregationNodeFilter,
controllerCtx.ComponentConfig,
)
if err != nil {
return nil, fmt.Errorf("error creating federate controller: %w", err)
Expand Down Expand Up @@ -204,6 +203,7 @@ func startScheduler(
controllerCtx.Metrics,
klog.Background(),
controllerCtx.WorkerCount,
controllerCtx.ComponentConfig.EnableKatalystSupport,
)
if err != nil {
return nil, fmt.Errorf("error creating scheduler: %w", err)
Expand Down
21 changes: 21 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

const (
DefaultPort = 11257

MinClusterHealthCheckPeriod = 15 * time.Second
)

type Options struct {
Expand Down Expand Up @@ -64,6 +66,9 @@ type Options struct {
PrometheusQuantiles map[string]string

ResourceAggregationNodeFilter []string

EnableKatalystSupport bool
ClusterHealthCheckPeriod time.Duration
}

func NewOptions() *Options {
Expand Down Expand Up @@ -153,6 +158,22 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string, disabl
"If the flag is provided multiple times, "+
"nodes are excluded as long as at least one of the selectors is matched.",
)

flags.BoolVar(
&o.EnableKatalystSupport,
"enable-katalyst-support",
false,
"Enable katalyst support in the cluster controller and scheduler. Enabling this, cluster controller "+
"will collect katalyst CNR resource into FederatedCluster if katalyst plugin was enabled in its "+
"annotations, and the scheduler will enable KatalystResourcesExist filter and support split replicas "+
"by katalyst resources.",
)
flags.DurationVar(
&o.ClusterHealthCheckPeriod,
"cluster-health-check-period",
time.Second*30,
"The period of health check for member clusters. The minimum value is "+MinClusterHealthCheckPeriod.String()+".",
)
}

func (o *Options) addKlogFlags(flags *pflag.FlagSet) {
Expand Down
6 changes: 6 additions & 0 deletions cmd/controller-manager/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ func getComponentConfig(opts *options.Options) (*controllercontext.ComponentConf
componentConfig := &controllercontext.ComponentConfig{
ClusterJoinTimeout: opts.ClusterJoinTimeout,
MemberObjectEnqueueDelay: opts.MemberObjectEnqueueDelay,
EnableKatalystSupport: opts.EnableKatalystSupport,
ClusterHealthCheckPeriod: opts.ClusterHealthCheckPeriod,
}

if opts.ClusterHealthCheckPeriod < options.MinClusterHealthCheckPeriod {
componentConfig.ClusterHealthCheckPeriod = options.MinClusterHealthCheckPeriod
}

if opts.NSAutoPropExcludeRegexp != "" {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0
github.com/go-logr/logr v1.2.4
github.com/google/go-cmp v0.5.9
github.com/kubewharf/katalyst-api v0.3.3
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.3.3 h1:/aE7PmOr5IFXgNTDe4sbmjsqw48o1QeIrLlr18t5/7s=
github.com/kubewharf/katalyst-api v0.3.3/go.mod h1:iVILS5UL5PRtkUPH2Iu1K/gFGTPMNItnth5fmQ80VGE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ type ComponentConfig struct {
ClusterJoinTimeout time.Duration
MemberObjectEnqueueDelay time.Duration
ResourceAggregationNodeFilter []labels.Selector
EnableKatalystSupport bool
ClusterHealthCheckPeriod time.Duration
}
190 changes: 185 additions & 5 deletions pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,29 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
apiextv1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic/dynamicinformer"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster/plugins"
"github.com/kubewharf/kubeadmiral/pkg/stats"
"github.com/kubewharf/kubeadmiral/pkg/stats/metrics"
clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster"
"github.com/kubewharf/kubeadmiral/pkg/util/informermanager"
"github.com/kubewharf/kubeadmiral/pkg/util/logging"
"github.com/kubewharf/kubeadmiral/pkg/util/resource"
)

const (
Expand All @@ -60,8 +67,166 @@ const (
ClusterReachableMsg = "Cluster is reachable"
ClusterNotReachableReason = "ClusterNotReachable"
ClusterNotReachableMsg = "Cluster is not reachable"

maxHealthCheckTimeout = 30 * time.Second
)

type resourceCollector struct {
name string
plugin plugins.Plugin
hasSynceds map[schema.GroupVersionResource]cache.InformerSynced
dynamicLister map[schema.GroupVersionResource]cache.GenericLister

allocatable, available corev1.ResourceList
}

func (c *FederatedClusterController) getExternalResourceCollectors(
cluster *fedcorev1a1.FederatedCluster,
) (resourceCollectors []resourceCollector) {
pluginMaps, err := plugins.ResolvePlugins(cluster.Annotations)
if err != nil {
c.logger.V(4).Info("Failed to get cluster plugins", "cluster", cluster.Name, "err", err)
return nil
}

clusterKey, _ := informermanager.DefaultClusterConnectionHash(cluster)
c.lock.Lock()
defer c.lock.Unlock()

if factory := c.externalClusterResourceInformers[string(clusterKey)]; factory != nil {
for pluginName, plugin := range pluginMaps {
rc := resourceCollector{
name: pluginName,
plugin: plugin,
hasSynceds: map[schema.GroupVersionResource]cache.InformerSynced{},
dynamicLister: map[schema.GroupVersionResource]cache.GenericLister{},
}

for gvr := range plugin.ClusterResourcesToCollect() {
rc.hasSynceds[gvr] = factory.ForResource(gvr).Informer().HasSynced
rc.dynamicLister[gvr] = factory.ForResource(gvr).Lister()
}
resourceCollectors = append(resourceCollectors, rc)
}
}

return resourceCollectors
}

func (c *FederatedClusterController) addOrUpdateExternalClusterResourceInformers(
cluster *fedcorev1a1.FederatedCluster,
) {
key, _ := informermanager.DefaultClusterConnectionHash(cluster)
clusterKey := string(key)

pluginMaps, err := plugins.ResolvePlugins(cluster.Annotations)
if err != nil {
c.logger.V(4).Info("Failed to get cluster plugins", "cluster", cluster.Name, "err", err)
return
}
if len(pluginMaps) == 0 {
c.removeExternalClusterResourceInformers(cluster.Name)
return
}
pluginsHash := plugins.PluginsHash(pluginMaps)

c.lock.Lock()
defer c.lock.Unlock()

if old, exist := c.clusterConnectionHashes[cluster.Name]; exist {
// Connection and plugins are unchanged, do nothing
if old == clusterKey && c.enabledClusterResourcePluginHashes[cluster.Name] == pluginsHash {
return
}
// Otherwise, delete old informer since the connection has changed.
// We use the same context for a cluster, so if the plugins changed,
// we have to rebuild the informer.
if cancel := c.externalClusterResourceInformerCancelFuncs[old]; cancel != nil {
cancel()
}
delete(c.externalClusterResourceInformers, old)
delete(c.externalClusterResourceInformerCancelFuncs, old)
}

client, ok := c.federatedInformerManager.GetClusterDynamicClient(cluster.Name)
if !ok {
return
}
informer := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
ctx, cancel := context.WithCancel(context.Background())
for _, plugin := range pluginMaps {
for gvr := range plugin.ClusterResourcesToCollect() {
informer.ForResource(gvr)
}
}
informer.Start(ctx.Done())

c.clusterConnectionHashes[cluster.Name] = clusterKey
c.enabledClusterResourcePluginHashes[cluster.Name] = pluginsHash
c.externalClusterResourceInformers[clusterKey] = informer
c.externalClusterResourceInformerCancelFuncs[clusterKey] = cancel
}

func (c *FederatedClusterController) removeExternalClusterResourceInformers(clusterName string) {
c.lock.Lock()
defer c.lock.Unlock()

key := c.clusterConnectionHashes[clusterName]
if cancel := c.externalClusterResourceInformerCancelFuncs[key]; cancel != nil {
cancel()
}

delete(c.clusterConnectionHashes, clusterName)
delete(c.enabledClusterResourcePluginHashes, clusterName)
delete(c.externalClusterResourceInformers, key)
delete(c.externalClusterResourceInformerCancelFuncs, key)
}

func collectExternalClusterResources(
ctx context.Context,
availableNodes []*corev1.Node,
pods []*corev1.Pod,
resourceCollectors []resourceCollector,
) {
wg := sync.WaitGroup{}
for i, collector := range resourceCollectors {
wg.Add(1)
go func(i int, collector resourceCollector) {
defer wg.Done()

ctx, logger := logging.InjectLoggerValues(ctx, "plugin", collector.name)
ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

hasSynceds := make([]cache.InformerSynced, 0, len(collector.hasSynceds))
names := make([]string, 0, len(collector.hasSynceds))
for gvr, synced := range collector.hasSynceds {
hasSynceds = append(hasSynceds, synced)
names = append(names, gvr.String())
}
if !cache.WaitForCacheSync(ctxWithTimeout.Done(), hasSynceds...) {
logger.V(4).Info("Timeout waiting for informers sync",
"informers", strings.Join(names, ";"))
return
}

allocatable, available, err := collector.plugin.CollectClusterResources(
ctx,
availableNodes,
pods,
plugins.ClusterHandle{DynamicLister: collector.dynamicLister},
)
if err != nil {
logger.V(4).Info("Failed to collect cluster resources", "err", err)
return
}
resourceCollectors[i].allocatable = allocatable
resourceCollectors[i].available = available
}(i, collector)
}
wg.Wait()
}

func (c *FederatedClusterController) collectIndividualClusterStatus(
ctx context.Context,
cluster *fedcorev1a1.FederatedCluster,
Expand Down Expand Up @@ -91,7 +256,11 @@ func (c *FederatedClusterController) collectIndividualClusterStatus(
cluster = cluster.DeepCopy()
conditionTime := metav1.Now()

offlineStatus, readyStatus := checkReadyByHealthz(ctx, discoveryClient)
timeout := c.clusterHealthCheckConfig.Period / 2
if timeout > maxHealthCheckTimeout {
timeout = maxHealthCheckTimeout
}
offlineStatus, readyStatus := checkReadyByHealthz(ctx, discoveryClient, timeout)
var readyReason, readyMessage string
switch readyStatus {
case corev1.ConditionTrue:
Expand All @@ -114,6 +283,7 @@ func (c *FederatedClusterController) collectIndividualClusterStatus(
podsSynced,
nodeLister,
nodesSynced,
c.getExternalResourceCollectors(cluster),
); err != nil {
logger.Error(err, "Failed to update cluster resources")
readyStatus = corev1.ConditionFalse
Expand Down Expand Up @@ -167,10 +337,11 @@ func (c *FederatedClusterController) collectIndividualClusterStatus(
func checkReadyByHealthz(
ctx context.Context,
clusterDiscoveryClient discovery.DiscoveryInterface,
timeout time.Duration,
) (offline, ready corev1.ConditionStatus) {
logger := klog.FromContext(ctx)

body, err := clusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Timeout(30 * time.Second).Do(ctx).Raw()
body, err := clusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Timeout(timeout).Do(ctx).Raw()
if err != nil {
logger.Error(err, "Cluster health check failed")
return corev1.ConditionTrue, corev1.ConditionUnknown
Expand All @@ -193,10 +364,11 @@ func (c *FederatedClusterController) updateClusterResources(
podsSynced cache.InformerSynced,
nodeLister corev1listers.NodeLister,
nodesSynced cache.InformerSynced,
resourceCollectors []resourceCollector,
) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if !cache.WaitForCacheSync(ctx.Done(), podsSynced, nodesSynced) {
if !cache.WaitForCacheSync(ctxWithTimeout.Done(), podsSynced, nodesSynced) {
return fmt.Errorf("timeout waiting for node and pod informer sync")
}

Expand All @@ -210,13 +382,21 @@ func (c *FederatedClusterController) updateClusterResources(
}

schedulableNodes := int64(0)
availableNodes := make([]*corev1.Node, 0, len(nodes))
for _, node := range nodes {
if isNodeSchedulable(node) && !c.isNodeFiltered(node) {
schedulableNodes++
availableNodes = append(availableNodes, node)
}
}
allocatable, available := c.aggregateResources(availableNodes, pods)

collectExternalClusterResources(ctx, availableNodes, pods, resourceCollectors)
for _, collector := range resourceCollectors {
resource.MergeResources(collector.allocatable, allocatable)
resource.MergeResources(collector.available, available)
}

allocatable, available := c.aggregateResources(nodes, pods)
clusterStatus.Resources = fedcorev1a1.Resources{
SchedulableNodes: &schedulableNodes,
Allocatable: allocatable,
Expand Down
Loading

0 comments on commit 85e709c

Please sign in to comment.