diff --git a/pkg/k8sutil/client.go b/pkg/k8sutil/client.go index 8dc5e341ba..4a70709f9d 100644 --- a/pkg/k8sutil/client.go +++ b/pkg/k8sutil/client.go @@ -22,7 +22,7 @@ import ( // NewClient connects to an API server either through KUBECONFIG (if set) or // through the in-cluster env variables. -func NewClient() (kubernetes.Interface, error) { +func NewClient(qps float64, burst int) (kubernetes.Interface, error) { var config *rest.Config var err error @@ -34,20 +34,8 @@ func NewClient() (kubernetes.Interface, error) { if err != nil { return nil, fmt.Errorf("create Kubernetes REST config: %v", err) } - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, fmt.Errorf("create Kubernetes client: %v", err) - } - return client, nil -} - -// NewInClusterClient connects code that runs inside a Kubernetes pod to the -// API server. -func NewInClusterClient() (kubernetes.Interface, error) { - config, err := rest.InClusterConfig() - if err != nil { - return nil, fmt.Errorf("build in-cluster Kubernetes client configuration: %v", err) - } + config.QPS = float32(qps) + config.Burst = burst client, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("create Kubernetes client: %v", err) diff --git a/pkg/pmem-csi-driver/main.go b/pkg/pmem-csi-driver/main.go index ef9e7622fc..c69961a52f 100644 --- a/pkg/pmem-csi-driver/main.go +++ b/pkg/pmem-csi-driver/main.go @@ -15,7 +15,6 @@ import ( "k8s.io/klog/v2" api "github.com/intel/pmem-csi/pkg/apis/pmemcsi/v1alpha1" - "github.com/intel/pmem-csi/pkg/k8sutil" "github.com/intel/pmem-csi/pkg/logger" pmemcommon "github.com/intel/pmem-csi/pkg/pmem-common" ) @@ -41,6 +40,9 @@ func init() { flag.StringVar(&config.CertFile, "certFile", "pmem-registry.pem", "SSL certificate file to use for authenticating client connections") flag.StringVar(&config.KeyFile, "keyFile", "pmem-registry-key.pem", "Private key file associated to certificate") + flag.Float64Var(&config.KubeAPIQPS, "kube-api-qps", 5, "QPS to use while communicating with the Kubernetes apiserver. Defaults to 5.0.") + flag.IntVar(&config.KubeAPIBurst, "kube-api-burst", 10, "Burst to use while communicating with the Kubernetes apiserver. Defaults to 10.") + /* metrics options */ flag.StringVar(&config.metricsListen, "metricsListen", "", "listen address (like :8001) for prometheus metrics endpoint, disabled by default") flag.StringVar(&config.metricsPath, "metricsPath", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") @@ -66,17 +68,9 @@ func Main() int { klog.V(3).Info("Version: ", version) - if config.schedulerListen != "" { - if config.Mode != Webhooks { - pmemcommon.ExitError("scheduler listening", errors.New("only supported in the controller")) - return 1 - } - c, err := k8sutil.NewClient() - if err != nil { - pmemcommon.ExitError("scheduler setup", err) - return 1 - } - config.client = c + if config.schedulerListen != "" && config.Mode != Webhooks { + pmemcommon.ExitError("scheduler listening", errors.New("only supported in the controller")) + return 1 } config.Version = version diff --git a/pkg/pmem-csi-driver/pmem-csi-driver.go b/pkg/pmem-csi-driver/pmem-csi-driver.go index 87b7164036..862b97e8f3 100644 --- a/pkg/pmem-csi-driver/pmem-csi-driver.go +++ b/pkg/pmem-csi-driver/pmem-csi-driver.go @@ -22,6 +22,7 @@ import ( api "github.com/intel/pmem-csi/pkg/apis/pmemcsi/v1alpha1" grpcserver "github.com/intel/pmem-csi/pkg/grpc-server" + "github.com/intel/pmem-csi/pkg/k8sutil" pmdmanager "github.com/intel/pmem-csi/pkg/pmem-device-manager" pmemgrpc "github.com/intel/pmem-csi/pkg/pmem-grpc" pmemstate "github.com/intel/pmem-csi/pkg/pmem-state" @@ -117,9 +118,16 @@ type Config struct { // PmemPercentage percentage of space to be used by the driver in each PMEM region PmemPercentage uint + // KubeAPIQPS is the average rate of requests to the Kubernetes API server, + // enforced locally in client-go. + KubeAPIQPS float64 + + // KubeAPIQPS is the number of requests that a client is + // allowed to send above the average rate of request. + KubeAPIBurst int + // parameters for Kubernetes scheduler extender schedulerListen string - client kubernetes.Interface // parameters for Prometheus metrics metricsListen string @@ -188,6 +196,16 @@ func (csid *csiDriver) Run() error { ) csid.gatherers = append(csid.gatherers, cmm.GetRegistry()) + var client kubernetes.Interface + if config.schedulerListen != "" || + csid.cfg.Mode == Node { + c, err := k8sutil.NewClient(config.KubeAPIQPS, config.KubeAPIBurst) + if err != nil { + return fmt.Errorf("connect to apiserver: %v", err) + } + client = c + } + switch csid.cfg.Mode { case Webhooks: namespace := os.Getenv("POD_NAMESPACE") @@ -198,13 +216,13 @@ func (csid *csiDriver) Run() error { if csid.cfg.schedulerListen == "" { return errors.New("webhooks mode needs a scheduler listen address") } - factory := informers.NewSharedInformerFactoryWithOptions(csid.cfg.client, resyncPeriod, + factory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod, informers.WithNamespace(namespace), ) podLister := factory.Core().V1().Pods().Lister() c := scheduler.CapacityViaMetrics(namespace, csid.cfg.DriverName, podLister) factory.Start(ctx.Done()) - if _, err := csid.startScheduler(ctx, cancel, c); err != nil { + if _, err := csid.startScheduler(ctx, cancel, client, c); err != nil { return err } case Node: @@ -260,14 +278,14 @@ func (csid *csiDriver) Run() error { // logs errors and cancels the context when it runs into a problem, // either during the startup phase (blocking) or later at runtime (in // a go routine). -func (csid *csiDriver) startScheduler(ctx context.Context, cancel func(), c scheduler.Capacity) (string, error) { - factory := informers.NewSharedInformerFactory(csid.cfg.client, resyncPeriod) +func (csid *csiDriver) startScheduler(ctx context.Context, cancel func(), client kubernetes.Interface, c scheduler.Capacity) (string, error) { + factory := informers.NewSharedInformerFactory(client, resyncPeriod) pvcLister := factory.Core().V1().PersistentVolumeClaims().Lister() scLister := factory.Storage().V1().StorageClasses().Lister() sched, err := scheduler.NewScheduler( csid.cfg.DriverName, c, - csid.cfg.client, + client, pvcLister, scLister, )