Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
pmem-csi-driver: configurable QPS+burst
Browse files Browse the repository at this point in the history
Client-side throttling in client-go may get in the way when running
scale testing, so it should be configurable.
  • Loading branch information
pohly committed Dec 18, 2020
1 parent 351f850 commit 93bf487
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
18 changes: 3 additions & 15 deletions pkg/k8sutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// NewClient connects to an API server either through KUBECONFIG (if set) or
// through the in-cluster env variables.
func NewClient() (kubernetes.Interface, error) {
func NewClient(qps float64, burst int) (kubernetes.Interface, error) {
var config *rest.Config
var err error

Expand All @@ -34,20 +34,8 @@ func NewClient() (kubernetes.Interface, error) {
if err != nil {
return nil, fmt.Errorf("create Kubernetes REST config: %v", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("create Kubernetes client: %v", err)
}
return client, nil
}

// NewInClusterClient connects code that runs inside a Kubernetes pod to the
// API server.
func NewInClusterClient() (kubernetes.Interface, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("build in-cluster Kubernetes client configuration: %v", err)
}
config.QPS = float32(qps)
config.Burst = burst
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("create Kubernetes client: %v", err)
Expand Down
18 changes: 6 additions & 12 deletions pkg/pmem-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"k8s.io/klog/v2"

api "github.com/intel/pmem-csi/pkg/apis/pmemcsi/v1alpha1"
"github.com/intel/pmem-csi/pkg/k8sutil"
"github.com/intel/pmem-csi/pkg/logger"
pmemcommon "github.com/intel/pmem-csi/pkg/pmem-common"
)
Expand All @@ -41,6 +40,9 @@ func init() {
flag.StringVar(&config.CertFile, "certFile", "pmem-registry.pem", "SSL certificate file to use for authenticating client connections")
flag.StringVar(&config.KeyFile, "keyFile", "pmem-registry-key.pem", "Private key file associated to certificate")

flag.Float64Var(&config.KubeAPIQPS, "kube-api-qps", 5, "QPS to use while communicating with the Kubernetes apiserver. Defaults to 5.0.")
flag.IntVar(&config.KubeAPIBurst, "kube-api-burst", 10, "Burst to use while communicating with the Kubernetes apiserver. Defaults to 10.")

/* metrics options */
flag.StringVar(&config.metricsListen, "metricsListen", "", "listen address (like :8001) for prometheus metrics endpoint, disabled by default")
flag.StringVar(&config.metricsPath, "metricsPath", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
Expand All @@ -66,17 +68,9 @@ func Main() int {

klog.V(3).Info("Version: ", version)

if config.schedulerListen != "" {
if config.Mode != Webhooks {
pmemcommon.ExitError("scheduler listening", errors.New("only supported in the controller"))
return 1
}
c, err := k8sutil.NewClient()
if err != nil {
pmemcommon.ExitError("scheduler setup", err)
return 1
}
config.client = c
if config.schedulerListen != "" && config.Mode != Webhooks {
pmemcommon.ExitError("scheduler listening", errors.New("only supported in the controller"))
return 1
}

config.Version = version
Expand Down
30 changes: 24 additions & 6 deletions pkg/pmem-csi-driver/pmem-csi-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

api "github.com/intel/pmem-csi/pkg/apis/pmemcsi/v1alpha1"
grpcserver "github.com/intel/pmem-csi/pkg/grpc-server"
"github.com/intel/pmem-csi/pkg/k8sutil"
pmdmanager "github.com/intel/pmem-csi/pkg/pmem-device-manager"
pmemgrpc "github.com/intel/pmem-csi/pkg/pmem-grpc"
pmemstate "github.com/intel/pmem-csi/pkg/pmem-state"
Expand Down Expand Up @@ -117,9 +118,16 @@ type Config struct {
// PmemPercentage percentage of space to be used by the driver in each PMEM region
PmemPercentage uint

// KubeAPIQPS is the average rate of requests to the Kubernetes API server,
// enforced locally in client-go.
KubeAPIQPS float64

// KubeAPIQPS is the number of requests that a client is
// allowed to send above the average rate of request.
KubeAPIBurst int

// parameters for Kubernetes scheduler extender
schedulerListen string
client kubernetes.Interface

// parameters for Prometheus metrics
metricsListen string
Expand Down Expand Up @@ -188,6 +196,16 @@ func (csid *csiDriver) Run() error {
)
csid.gatherers = append(csid.gatherers, cmm.GetRegistry())

var client kubernetes.Interface
if config.schedulerListen != "" ||
csid.cfg.Mode == Node {
c, err := k8sutil.NewClient(config.KubeAPIQPS, config.KubeAPIBurst)
if err != nil {
return fmt.Errorf("connect to apiserver: %v", err)
}
client = c
}

switch csid.cfg.Mode {
case Webhooks:
namespace := os.Getenv("POD_NAMESPACE")
Expand All @@ -198,13 +216,13 @@ func (csid *csiDriver) Run() error {
if csid.cfg.schedulerListen == "" {
return errors.New("webhooks mode needs a scheduler listen address")
}
factory := informers.NewSharedInformerFactoryWithOptions(csid.cfg.client, resyncPeriod,
factory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithNamespace(namespace),
)
podLister := factory.Core().V1().Pods().Lister()
c := scheduler.CapacityViaMetrics(namespace, csid.cfg.DriverName, podLister)
factory.Start(ctx.Done())
if _, err := csid.startScheduler(ctx, cancel, c); err != nil {
if _, err := csid.startScheduler(ctx, cancel, client, c); err != nil {
return err
}
case Node:
Expand Down Expand Up @@ -260,14 +278,14 @@ func (csid *csiDriver) Run() error {
// logs errors and cancels the context when it runs into a problem,
// either during the startup phase (blocking) or later at runtime (in
// a go routine).
func (csid *csiDriver) startScheduler(ctx context.Context, cancel func(), c scheduler.Capacity) (string, error) {
factory := informers.NewSharedInformerFactory(csid.cfg.client, resyncPeriod)
func (csid *csiDriver) startScheduler(ctx context.Context, cancel func(), client kubernetes.Interface, c scheduler.Capacity) (string, error) {
factory := informers.NewSharedInformerFactory(client, resyncPeriod)
pvcLister := factory.Core().V1().PersistentVolumeClaims().Lister()
scLister := factory.Storage().V1().StorageClasses().Lister()
sched, err := scheduler.NewScheduler(
csid.cfg.DriverName,
c,
csid.cfg.client,
client,
pvcLister,
scLister,
)
Expand Down

0 comments on commit 93bf487

Please sign in to comment.