Skip to content

Commit

Permalink
storage capacity: initial implementation
Browse files Browse the repository at this point in the history
This is the producer side of KEP
https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking.

Only deployment together with a central controller is currently
implemented.

When syncing directly whenever there is a change, there's potentially
a larger number of changes emitted. When there are rapid changes (for
example, while a driver gets deployed), it may be better to delay
processing and thus combine multiple changes in a single sync.
  • Loading branch information
pohly committed Jul 14, 2020
1 parent e309104 commit f2c7497
Show file tree
Hide file tree
Showing 10 changed files with 2,359 additions and 1 deletion.
60 changes: 59 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
flag "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -43,6 +45,8 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/deprecatedflags"
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity/topology"
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned"
)
Expand All @@ -58,7 +62,8 @@ var (
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaniously running threads, handling cloning finalizer removal")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
capacityThreads = flag.Uint("storage-capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for creation or deletion of a volume")
_ = deprecatedflags.Add("provisioner")

Expand All @@ -76,6 +81,12 @@ var (
kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

capacityFeatures = func() *capacity.Features {
capacity := &capacity.Features{}
flag.Var(capacity, "enable-capacity", "Enables producing CSIStorageCapacity objects with capacity information the driver's GetCapacity call. Currently supported: -enable-capacity=central.")
return capacity
}()

featureGates map[string]bool
provisionController *controller.ProvisionController
version = "unknown"
Expand Down Expand Up @@ -266,6 +277,50 @@ func main() {
controllerCapabilities,
)

var capacityController *capacity.Controller
if (*capacityFeatures)[capacity.FeatureCentral] {
// TODO: use parameters instead?
podName := os.Getenv("POD_NAME")
namespace := os.Getenv("POD_NAMESPACE")
if podName == "" || namespace == "" {
klog.Fatalf("need POD_NAMESPACE/POD_NAME env variables, have only POD_NAMESPACE=%q and POD_NAME=%q", namespace, podName)
}
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
klog.Fatalf("error getting our own pod: %v", err)
}
var controller *metav1.OwnerReference
for _, owner := range pod.OwnerReferences {
if owner.Controller != nil && *owner.Controller {
controller = &owner
break
}
}
if controller == nil {
klog.Fatal("pod does not have a controller which owns it")
}

topologyInformer := topology.NewNodeTopology(
provisionerName,
clientset,
factory.Core().V1().Nodes(),
factory.Storage().V1().CSINodes(),
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),
)

capacityController = capacity.NewCentralCapacityController(
csi.NewControllerClient(grpcClient),
provisionerName,
clientset,
// TODO: metrics for the queue?!
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
*controller,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
)
}

run := func(ctx context.Context) {
factory.Start(ctx.Done())
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
Expand All @@ -275,6 +330,9 @@ func main() {
}
}

if capacityController != nil {
go capacityController.Run(ctx, int(*capacityThreads))
}
if csiClaimController != nil {
go csiClaimController.Run(ctx, int(*finalizerThreads))
}
Expand Down
Loading

0 comments on commit f2c7497

Please sign in to comment.