Skip to content

Commit

Permalink
Merge pull request #450 from pohly/storage-capacity
Browse files Browse the repository at this point in the history
storage capacity producer
k8s-ci-robot authored Aug 18, 2020

Verified

This commit was signed with the committer’s verified signature.
zeeshanlakhani Zeeshan Lakhani
2 parents cb437bf + e50daf3 commit e909258
Showing 50 changed files with 7,661 additions and 34 deletions.
120 changes: 119 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ Following table reflects the head of this branch.
| -------------- | ------- | ------- | --------------------------------------------------------------------------------------------- | --------------------------------- |
| Snapshots | Beta | On | [Snapshots and Restore](https://kubernetes-csi.github.io/docs/snapshot-restore-feature.html). | No |
| CSIMigration | Beta | On | [Migrating in-tree volume plugins to CSI](https://kubernetes.io/docs/concepts/storage/volumes/#csi-migration). | No |
| CSIStorageCapacity | Alpha | Off | Publish [capacity information](https://kubernetes.io/docs/concepts/storage/volumes/#storage-capacity) for the Kubernetes scheduler. | No |

All other external-provisioner features and the external-provisioner itself is considered GA and fully supported.

@@ -61,14 +62,28 @@ Note that the external-provisioner does not scale with more replicas. Only one e

* `--kube-api-burst <num>`: Burst for clients that communicate with the kubernetes apiserver. Defaults to `10`.

* `--cloning-protection-threads <num>`: Number of simultaniously running threads, handling cloning finalizer removal. Defaults to `1`.
* `--cloning-protection-threads <num>`: Number of simultaneously running threads, handling cloning finalizer removal. Defaults to `1`.

* `--metrics-address`: The TCP network address where the prometheus metrics endpoint will run (example: `:8080` which corresponds to port 8080 on local host). The default is empty string, which means metrics endpoint is disabled.

* `--metrics-path`: The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.

* `--extra-create-metadata`: Enables the injection of extra PVC and PV metadata as parameters when calling `CreateVolume` on the driver (keys: "csi.storage.k8s.io/pvc/name", "csi.storage.k8s.io/pvc/namespace", "csi.storage.k8s.io/pv/name")

##### Storage capacity arguments

See the [storage capacity section](#capacity-support) below for details.

* `--capacity-threads <num>`: Number of simultaneously running threads, handling CSIStorageCapacity objects. Defaults to `1`.

* `--capacity-poll-interval <interval>`: How long the external-provisioner waits before checking for storage capacity changes. Defaults to `1m`.

* `--capacity-controller-deployment-mode central|none`: Enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call. 'central' is currently the only supported mode. Use it when there is just one active provisioner in the cluster. Defaults to `none`.

* `--capacity-ownerref-level <levels>`: The level indicates the number of objects that need to be traversed starting from the pod identified by the POD_NAME and POD_NAMESPACE environment variables to reach the owning object for CSIStorageCapacity objects: 0 for the pod itself, 1 for a StatefulSet, 2 for a Deployment, etc. Defaults to `1` (= StatefulSet).

* `--capacity-for-immediate-binding <bool>`: Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging. Defaults to `false`.

#### Other recognized arguments
* `--feature-gates <gates>`: A set of comma separated `<feature-name>=<true|false>` pairs that describe feature gates for alpha/experimental features. See [list of features](#feature-status) or `--help` output for list of recognized features. Example: `--feature-gates Topology=true` to enable Topology feature that's disabled by default.

@@ -102,6 +117,109 @@ Yes | No | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` w
No | Irrelevant | No | `Requisite` = Aggregated cluster topology<br>`Preferred` = `Requisite` with randomly selected node topology as first element
No | Irrelevant | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` with randomly selected node topology as first element

### Capacity support

> :warning: *Warning:* This is an alpha feature and only supported by
> Kubernetes >= 1.19 if the `CSIStorageCapacity` feature gate is
> enabled.
The external-provisioner can be used to create CSIStorageCapacity
objects that hold information about the storage capacity available
through the driver. The Kubernetes scheduler then [uses that
information](https://kubernetes.io/docs/concepts/storage/storage-capacity]
when selecting nodes for pods with unbound volumes that wait for the
first consumer.

Currently, all CSIStorageCapacity objects created by an instance of
the external-provisioner must have the same
[owner](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#owners-and-dependents). That
owner is how external-provisioner distinguishes between objects that
it must manage and those that it must leave alone. The owner is
determine with the `POD_NAME/POD_NAMESPACE` environment variables and
the `--capacity-ownerref-level` parameter. Other solutions will be
added in the future.

To enable this feature in a driver deployment (see also the
[`deploy/kubernetes/storage-capacity.yaml`](deploy/kubernetes/storage-capacity.yaml)
example):

- Set the `POD_NAME` and `POD_NAMESPACE` environment variables like this:
```yaml
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
```
- Add `--enable-capacity=central` to the command line flags.
- Add `StorageCapacity: true` to the CSIDriver information object.
Without it, external-provisioner will publish information, but the
Kubernetes scheduler will ignore it. This can be used to first
deploy the driver without that flag, then when sufficient
information has been published, enabled the scheduler usage of it.
- If external-provisioner is not deployed with a StatefulSet, then
configure with `--capacity-ownerref-level` which object is meant to own
CSIStorageCapacity objects.
- Optional: configure how often external-provisioner polls the driver
to detect changed capacity with `--capacity-poll-interval`.
- Optional: configure how many worker threads are used in parallel
with `--capacity-threads`.
- Optional: enable producing information also for storage classes that
use immediate volume binding with
`--enable-capacity=immediate-binding`. This is usually not needed
because such volumes are created by the driver without involving the
Kubernetes scheduler and thus the published information would just
be ignored.

To determine how many different topology segments exist,
external-provisioner uses the topology keys and labels that the CSI
driver instance on each node reports to kubelet in the
`NodeGetInfoResponse.accessible_topology` field. The keys are stored
by kubelet in the CSINode objects and the actual values in Node
annotations.

CSI drivers must report topology information that matches the storage
pool(s) that it has access to, with granularity that matches the most
restrictive pool.

For example, if the driver runs in a node with region/rack topology
and has access to per-region storage as well as per-rack storage, then
the driver should report topology with region/rack as its keys. If it
only has access to per-region storage, then it should just use region
as key. If it uses region/rack, then redundant CSIStorageCapacity
objects will be published, but the information is still correct. See
the
[KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking#with-central-controller)
for details.

For each segment and each storage class, CSI `GetCapacity` is called
once with the topology of the segment and the parameters of the
class. If there is no error and the capacity is non-zero, a
CSIStorageCapacity object is created or updated (if it
already exists from a prior call) with that information. Obsolete
objects are removed.

To ensure that CSIStorageCapacity objects get removed when the
external-provisioner gets removed from the cluster, they all have an
owner and therefore get garbage-collected when that owner
disappears. The owner is not the external-provisioner pod itself but
rather one of its parents as specified by `--capacity-ownerref-level`.
This way, it is possible to switch between external-provisioner
instances without losing the already gathered information.

CSIStorageCapacity objects are namespaced and get created in the
namespace of the external-provisioner. Only CSIStorageCapacity objects
with the right owner are modified by external-provisioner and their
name is generated, so it is possible to deploy different drivers in
the same namespace. However, Kubernetes does not check who is creating
CSIStorageCapacity objects, so in theory a malfunctioning or malicious
driver deployment could also publish incorrect information about some
other driver.

### CSI error and timeout handling
The external-provisioner invokes all gRPC calls to CSI driver with timeout provided by `--timeout` command line argument (15 seconds by default).

75 changes: 74 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,9 @@ import (
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
flag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
@@ -43,7 +45,10 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/deprecatedflags"
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity/topology"
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
"github.com/kubernetes-csi/external-provisioner/pkg/owner"
snapclientset "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned"
)

@@ -58,7 +63,8 @@ var (
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaniously running threads, handling cloning finalizer removal")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for creation or deletion of a volume")
_ = deprecatedflags.Add("provisioner")

@@ -76,6 +82,15 @@ var (
kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

capacityMode = func() *capacity.DeploymentMode {
mode := capacity.DeploymentModeNone
flag.Var(&mode, "capacity-controller-deployment-mode", "Enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call. 'central' is currently the only supported mode. Use it when there is just one active provisioner in the cluster.")
return &mode
}()
capacityImmediateBinding = flag.Bool("capacity-for-immediate-binding", false, "Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging.")
capacityPollInterval = flag.Duration("capacity-poll-interval", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.")
capacityOwnerrefLevel = flag.Int("capacity-ownerref-level", 1, "The level indicates the number of objects that need to be traversed starting from the pod identified by the POD_NAME and POD_NAMESPACE environment variables to reach the owning object for CSIStorageCapacity objects: 0 for the pod itself, 1 for a StatefulSet, 2 for a Deployment, etc.")

featureGates map[string]bool
provisionController *controller.ProvisionController
version = "unknown"
@@ -181,6 +196,7 @@ func main() {
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName

factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity

// -------------------------------
// Listers
@@ -266,15 +282,72 @@ func main() {
controllerCapabilities,
)

var capacityController *capacity.Controller
if *capacityMode == capacity.DeploymentModeCentral {
podName := os.Getenv("POD_NAME")
namespace := os.Getenv("POD_NAMESPACE")
if podName == "" || namespace == "" {
klog.Fatalf("need POD_NAMESPACE/POD_NAME env variables, have only POD_NAMESPACE=%q and POD_NAME=%q", namespace, podName)
}
controller, err := owner.Lookup(config, namespace, podName,
schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Pod",
}, *capacityOwnerrefLevel)
if err != nil {
klog.Fatalf("look up owner(s) of pod: %v", err)
}
klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name)

topologyInformer := topology.NewNodeTopology(
provisionerName,
clientset,
factory.Core().V1().Nodes(),
factory.Storage().V1().CSINodes(),
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),
)

// We only need objects from our own namespace. The normal factory would give
// us an informer for the entire cluster.
factoryForNamespace = informers.NewSharedInformerFactoryWithOptions(clientset,
ctrl.ResyncPeriodOfCsiNodeInformer,
informers.WithNamespace(namespace),
)

capacityController = capacity.NewCentralCapacityController(
csi.NewControllerClient(grpcClient),
provisionerName,
clientset,
// TODO: metrics for the queue?!
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
*controller,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
factoryForNamespace.Storage().V1alpha1().CSIStorageCapacities(),
*capacityPollInterval,
*capacityImmediateBinding,
)
}

run := func(ctx context.Context) {
factory.Start(ctx.Done())
if factoryForNamespace != nil {
// Starting is enough, the capacity controller will
// wait for sync.
factoryForNamespace.Start(ctx.Done())
}
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
for _, v := range cacheSyncResult {
if !v {
klog.Fatalf("Failed to sync Informers!")
}
}

if capacityController != nil {
go capacityController.Run(ctx, int(*capacityThreads))
}
if csiClaimController != nil {
go csiClaimController.Run(ctx, int(*finalizerThreads))
}
15 changes: 15 additions & 0 deletions deploy/kubernetes/rbac.yaml
Original file line number Diff line number Diff line change
@@ -87,6 +87,21 @@ rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "watch", "list", "delete", "update", "create"]
# Permissions for CSIStorageCapacity are only needed enabling the publishing
# of storage capacity information.
- apiGroups: ["storage.k8s.io"]
resources: ["csistoragecapacities"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# The GET permissions below are needed for walking up the ownership chain
# for CSIStorageCapacity. They are sufficient for deployment via
# StatefulSet (only needs to get Pod) and Deployment (needs to get
# Pod and then ReplicaSet to find the Deployment).
- apiGroups: [""]
resources: ["pods"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["get"]

---
kind: RoleBinding
57 changes: 57 additions & 0 deletions deploy/kubernetes/storage-capacity.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This YAML file demonstrates how to enable the
# storage capacity feature when deploying the
# external provisioner, in this example together
# with the mock CSI driver.
#
# It depends on the RBAC definitions from rbac.yaml.
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: csi-provisioner
spec:
replicas: 3
selector:
matchLabels:
app: csi-provisioner
template:
metadata:
labels:
app: csi-provisioner
spec:
serviceAccount: csi-provisioner
containers:
- name: csi-provisioner
image: k8s.gcr.io/sig-storage/csi-provisioner:v2.0.0
args:
- "--csi-address=$(ADDRESS)"
- "--leader-election"
- "--enable-capacity=central"
- "--capacity-ownerref-level=2"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/mock.socket
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/

- name: mock-driver
image: quay.io/k8scsi/mock-driver:canary
env:
- name: CSI_ENDPOINT
value: /var/lib/csi/sockets/pluginproxy/mock.socket
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/
volumes:
- name: socket-dir
emptyDir:
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ require (
k8s.io/csi-translation-lib v0.19.0-rc.2
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.19.0-rc.2
sigs.k8s.io/controller-runtime v0.6.2
sigs.k8s.io/sig-storage-lib-external-provisioner/v6 v6.1.0-rc1
)

24 changes: 24 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -181,6 +181,8 @@ github.com/go-logr/logr v0.1.0 h1:M1Tv3VzNlEHg6uyACnRdtrploV2P7wZqH8BoQMtz0cg=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0 h1:QvGt2nLcHH0WK9orKa+ppBPAxREcH364nPUedEpK0TY=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/zapr v0.1.0 h1:h+WVe9j6HAA01niTJPA/kKH0i7e0rLZBCwauQFcRE54=
github.com/go-logr/zapr v0.1.0/go.mod h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk=
github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI=
github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
github.com/go-openapi/analysis v0.18.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
@@ -288,6 +290,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g=
github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/googleapis/gnostic v0.3.1/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1awfrALZdbtU=
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
@@ -311,6 +314,8 @@ github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible/go.mod h1:bB9ly3RchcQqsQ9CpyaQwvva7RS5ytVoSoholZQON6o=
github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6/go.mod h1:xGMAM8JLi7UkZt1i4FQeQy0R2T8GLUwQhOP5M1gBhy4=
@@ -419,6 +424,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -429,12 +436,16 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU=
github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
github.com/onsi/ginkgo v1.12.1 h1:mFwc4LvZ0xpSvDZ3E+k8Yte0hLOMxXUlP+yXtJqkYfQ=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg=
github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
@@ -482,6 +493,7 @@ github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7z
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
@@ -555,6 +567,7 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345 h1:2gOG36vt1BhUqpzxwZLZJxUim2dHB05vw+RAn4Q6YOU=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345/go.mod h1:skWido08r9w6Lq/w70DO5XYIKMu4QFu1+4VsqLQuJy8=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
@@ -637,6 +650,8 @@ golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -671,6 +686,7 @@ golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -739,6 +755,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.0.1 h1:xyiBuvkD2g5n7cYzx6u2sxQvsAy4QJsZFCzGVdzOXZ0=
gomodules.xyz/jsonpatch/v2 v2.0.1/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
@@ -826,6 +844,8 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
@@ -835,6 +855,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
k8s.io/api v0.19.0-rc.2 h1:Lq0owhvgpWXmMtz+t2AT/JJpIAPX9X8lK3oE2qslYCU=
k8s.io/api v0.19.0-rc.2/go.mod h1:9nHeM2gbqeaL7yN6UFvOxKzLG5gZ4v+DJ6bpavDetZo=
k8s.io/apiextensions-apiserver v0.19.0-rc.2 h1:K57jvXQhrmyr58vEBWlO2eaTpDdtTOOnSIL2cnDc9Oc=
k8s.io/apiextensions-apiserver v0.19.0-rc.2/go.mod h1:LkNk/VUFXmwgURxOOQz3FJEjX/Ls0bwkq5/LIGTipIM=
k8s.io/apimachinery v0.19.0-rc.2 h1:JScnJRuwKHT8RmdrsFMkE4Oi+SVI/QIWFGOOhNZJe/M=
k8s.io/apimachinery v0.19.0-rc.2/go.mod h1:eHbWZVMaaewmYBAUuRYnAmTTMtDhvpPNZuh8/6Yl7v0=
@@ -877,6 +898,7 @@ k8s.io/metrics v0.19.0-rc.2/go.mod h1:wtTMGMCxx0brO15Nf1KqwuDjSmH3QoyX6gx8FPnmi4
k8s.io/sample-apiserver v0.19.0-rc.2/go.mod h1:ujEUq5dgRk6COe/8PIP0YM9HrJrjsqh73+UCxQxjZPw=
k8s.io/system-validators v1.1.2/go.mod h1:bPldcLgkIUK22ALflnsXk8pvkTEndYdNuaHH6gRrl0Q=
k8s.io/utils v0.0.0-20200414100711-2df71ebbae66/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20200720150651-0bdb4ca86cbc h1:GiXZzevctVRRBh56shqcqB9s9ReWMU6GTsFyE2RCFJQ=
k8s.io/utils v0.0.0-20200720150651-0bdb4ca86cbc/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
@@ -890,6 +912,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.9 h1:rusRLrDhjBp6aYtl9sGEvQJr6faoHoDLd0YcUBTZguI=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.9/go.mod h1:dzAXnQbTRyDlZPJX2SUPEqvnB+j7AJjtlox7PEwigU0=
sigs.k8s.io/controller-runtime v0.6.2 h1:jkAnfdTYBpFwlmBn3pS5HFO06SfxvnTZ1p5PeEF/zAA=
sigs.k8s.io/controller-runtime v0.6.2/go.mod h1:vhcq/rlnENJ09SIRp3EveTaZ0yqH526hjf9iJdbUJ/E=
sigs.k8s.io/kustomize v2.0.3+incompatible/go.mod h1:MkjgH3RdOWrievjo6c9T245dYlB5QeXV4WCbnt/PEpU=
sigs.k8s.io/sig-storage-lib-external-provisioner/v6 v6.1.0-rc1 h1:n7bIUaBsWmTUHqwJatYiNa2ZspjeQyzZwxfE4D4G4zQ=
sigs.k8s.io/sig-storage-lib-external-provisioner/v6 v6.1.0-rc1/go.mod h1:N+Ctyyr/Vwp8WkAG6DjpxcG0yWPlKSTj24RjzWzBSME=
632 changes: 632 additions & 0 deletions pkg/capacity/capacity.go

Large diffs are not rendered by default.

1,173 changes: 1,173 additions & 0 deletions pkg/capacity/capacity_test.go

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions pkg/capacity/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package capacity contains the code which controls the CSIStorageCapacity
// objects owned by the external-provisioner.
package capacity
63 changes: 63 additions & 0 deletions pkg/capacity/mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package capacity

import (
"errors"
"strings"

flag "github.com/spf13/pflag"
)

// DeploymentMode determines how the capacity controller operates.
type DeploymentMode string

const (
// DeploymentModeCentral enables the mode where there is only one
// external-provisioner actively running in the cluster which
// talks to the CSI driver's controller service.
DeploymentModeCentral = DeploymentMode("central")

// DeploymentModeLocal enables the mode where external-provisioner
// is deployed on each node. Not implemented yet.
DeploymentModeLocal = DeploymentMode("local")

// DeploymentModeNone disables capacity support.
DeploymentModeNone = DeploymentMode("none")
)

// Set enables the named features. Multiple features can be listed, separated by commas,
// with optional whitespace.
func (mode *DeploymentMode) Set(value string) error {
switch DeploymentMode(value) {
case DeploymentModeCentral, DeploymentModeNone:
*mode = DeploymentMode(value)
default:
return errors.New("invalid value")
}
return nil
}

func (mode *DeploymentMode) String() string {
return string(*mode)
}

func (mode *DeploymentMode) Type() string {
return strings.Join([]string{string(DeploymentModeCentral), string(DeploymentModeNone)}, "|")
}

var _ flag.Value = new(DeploymentMode)
21 changes: 21 additions & 0 deletions pkg/capacity/topology/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package topology contains an abstract interface for discovering
// topology segments for a storage backend and a specific implementation
// which does that based on the CSINodeDriver.TopologyKeys and the
// corresponding labels for the nodes.
package topology
336 changes: 336 additions & 0 deletions pkg/capacity/topology/nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topology

import (
"context"
"reflect"
"sort"
"sync"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

// NewNodeTopology returns an informer that synthesizes storage
// topology segments based on the accessible topology that each CSI
// driver node instance reports. See
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking#with-central-controller
// for details.
func NewNodeTopology(
driverName string,
client kubernetes.Interface,
nodeInformer coreinformersv1.NodeInformer,
csiNodeInformer storageinformersv1.CSINodeInformer,
queue workqueue.RateLimitingInterface,
) Informer {
nt := &nodeTopology{
driverName: driverName,
client: client,
nodeInformer: nodeInformer,
csiNodeInformer: csiNodeInformer,
queue: queue,
}

// Whenever Node or CSINode objects change, we need to
// recalculate the new topology segments. We could do that
// immediately, but it is better to let the input data settle
// a bit and just remember that there is work to be done.
nodeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
klog.Errorf("added object: expected Node, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: new node: %s", node.Name)
queue.Add("")
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldNode, ok := oldObj.(*v1.Node)
if !ok {
klog.Errorf("original object: expected Node, got %T -> ignoring it", oldObj)
return
}
newNode, ok := newObj.(*v1.Node)
if !ok {
klog.Errorf("updated object: expected Node, got %T -> ignoring it", newObj)
return
}
if reflect.DeepEqual(oldNode.Labels, newNode.Labels) {
// Shortcut: labels haven't changed, no need to sync.
return
}
klog.V(5).Infof("capacity topology: updated node: %s", newNode.Name)
queue.Add("")
},
DeleteFunc: func(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
node, ok := obj.(*v1.Node)
if !ok {
klog.Errorf("deleted object: expected Node, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: removed node: %s", node.Name)
queue.Add("")
},
}
nodeInformer.Informer().AddEventHandler(nodeHandler)
csiNodeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csiNode, ok := obj.(*storagev1.CSINode)
if !ok {
klog.Errorf("added object: expected CSINode, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: new CSINode: %s", csiNode.Name)
queue.Add("")
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldCSINode, ok := oldObj.(*storagev1.CSINode)
if !ok {
klog.Errorf("original object: expected CSINode, got %T -> ignoring it", oldObj)
return
}
newCSINode, ok := newObj.(*storagev1.CSINode)
if !ok {
klog.Errorf("updated object: expected CSINode, got %T -> ignoring it", newObj)
return
}
oldKeys := nt.driverTopologyKeys(oldCSINode)
newKeys := nt.driverTopologyKeys(newCSINode)
if reflect.DeepEqual(oldKeys, newKeys) {
// Shortcut: keys haven't changed, no need to sync.
return
}
klog.V(5).Infof("capacity topology: updated CSINode: %s", newCSINode.Name)
queue.Add("")
},
DeleteFunc: func(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
csiNode, ok := obj.(*storagev1.CSINode)
if !ok {
klog.Errorf("deleted object: expected CSINode, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: removed CSINode: %s", csiNode.Name)
queue.Add("")
},
}
csiNodeInformer.Informer().AddEventHandler(csiNodeHandler)

return nt
}

var _ Informer = &nodeTopology{}

type nodeTopology struct {
driverName string
client kubernetes.Interface
nodeInformer coreinformersv1.NodeInformer
csiNodeInformer storageinformersv1.CSINodeInformer
queue workqueue.RateLimitingInterface

mutex sync.Mutex
// segments hold a list of all currently known topology segments.
segments []*Segment
// callbacks contains all callbacks that need to be invoked
// after making changes to the list of known segments.
callbacks []Callback
}

// driverTopologyKeys returns nil if the driver is not running
// on the node, otherwise at least an empty slice of topology keys.
func (nt *nodeTopology) driverTopologyKeys(csiNode *storagev1.CSINode) []string {
for _, csiNodeDriver := range csiNode.Spec.Drivers {
if csiNodeDriver.Name == nt.driverName {
if csiNodeDriver.TopologyKeys == nil {
return []string{}
}
return csiNodeDriver.TopologyKeys
}
}
return nil
}

func (nt *nodeTopology) AddCallback(cb Callback) {
nt.mutex.Lock()
defer nt.mutex.Unlock()

nt.callbacks = append(nt.callbacks, cb)
}

func (nt *nodeTopology) List() []*Segment {
nt.mutex.Lock()
defer nt.mutex.Unlock()

// We need to return a new slice to protect against future
// changes in nt.segments. The segments themselves are
// immutable and shared.
segments := make([]*Segment, len(nt.segments))
copy(segments, nt.segments)
return segments
}

func (nt *nodeTopology) Run(ctx context.Context) {
go nt.nodeInformer.Informer().Run(ctx.Done())
go nt.csiNodeInformer.Informer().Run(ctx.Done())
go nt.runWorker(ctx)

klog.Info("Started node topology informer")
<-ctx.Done()
klog.Info("Shutting node topology informer")
}

func (nt *nodeTopology) HasSynced() bool {
if nt.nodeInformer.Informer().HasSynced() &&
nt.csiNodeInformer.Informer().HasSynced() {
// Now that both informers are up-to-date, use that
// information to update our own view of the world.
nt.sync(context.Background())
return true
}
return false
}

func (nt *nodeTopology) runWorker(ctx context.Context) {
for nt.processNextWorkItem(ctx) {
}
}

func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := nt.queue.Get()
if shutdown {
return false
}
defer nt.queue.Done(obj)
nt.sync(ctx)
return true
}

func (nt *nodeTopology) sync(ctx context.Context) {
// For all nodes on which the driver is registered, collect the topology key/value pairs
// and sort them by key name to make the result deterministic. Skip all segments that have
// been seen before.
segments := nt.List()
removalCandidates := map[*Segment]bool{}
var addedSegments, removedSegments []*Segment
for _, segment := range segments {
// Assume that the segment is removed. Will be set to
// false if we find out otherwise.
removalCandidates[segment] = true
}

csiNodes, err := nt.csiNodeInformer.Lister().List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}
existingSegments := make([]*Segment, 0, len(segments))
node:
for _, csiNode := range csiNodes {
topologyKeys := nt.driverTopologyKeys(csiNode)
if topologyKeys == nil {
// Driver not running on node, ignore it.
continue
}
node, err := nt.nodeInformer.Lister().Get(csiNode.Name)
if err != nil {
if apierrs.IsNotFound(err) {
// Obsolete CSINode object? Ignore it.
continue
}
// This shouldn't happen. If it does,
// something is very wrong and we give up.
utilruntime.HandleError(err)
return
}

newSegment := Segment{}
sort.Strings(topologyKeys)
for _, key := range topologyKeys {
value, ok := node.Labels[key]
if !ok {
// The driver announced some topology key and kubelet recorded
// it in CSINode, but we haven't seen the corresponding
// node update yet as the label is not set. Ignore the node
// for now, we'll sync up when we get the node update.
continue node
}
newSegment = append(newSegment, SegmentEntry{key, value})
}

// Add it only if new, otherwise look at the next node.
for _, segment := range segments {
if newSegment.Compare(*segment) == 0 {
// Reuse a segment instead of using the new one. This keeps pointers stable.
removalCandidates[segment] = false
existingSegments = append(existingSegments, segment)
continue node
}
}
for _, segment := range addedSegments {
if newSegment.Compare(*segment) == 0 {
// We already discovered this new segment.
continue node
}
}

// A completely new segment.
addedSegments = append(addedSegments, &newSegment)
existingSegments = append(existingSegments, &newSegment)
}

// Lock while making changes, but unlock before actually invoking callbacks.
nt.mutex.Lock()
nt.segments = existingSegments

// Theoretically callbacks could change while we don't have
// the lock, so make a copy.
callbacks := make([]Callback, len(nt.callbacks))
copy(callbacks, nt.callbacks)
nt.mutex.Unlock()

for segment, wasRemoved := range removalCandidates {
if wasRemoved {
removedSegments = append(removedSegments, segment)
}
}
if len(addedSegments) > 0 || len(removedSegments) > 0 {
klog.V(5).Infof("topology changed: added %v, removed %v", addedSegments, removedSegments)
for _, cb := range callbacks {
cb(addedSegments, removedSegments)
}
} else {
klog.V(5).Infof("topology unchanged")
}
}
633 changes: 633 additions & 0 deletions pkg/capacity/topology/nodes_test.go

Large diffs are not rendered by default.

137 changes: 137 additions & 0 deletions pkg/capacity/topology/topology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topology

import (
"context"
"fmt"
"sort"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Segment represents a topology segment. Entries are always sorted by
// key and keys are unique. In contrast to a map, segments therefore
// can be compared efficiently. A nil segment matches no nodes
// in a cluster, an empty segment all of them.
type Segment []SegmentEntry

var _ sort.Interface = Segment{}

// String returns the address *and* the content of the segment; the address
// is how the segment is identified when used as a hash key.
func (s *Segment) String() string {
return fmt.Sprintf("%p = %s", s, s.SimpleString())
}

// SimpleString only returns the content.
func (s *Segment) SimpleString() string {
var parts []string
for _, entry := range *s {
parts = append(parts, entry.String())
}
return strings.Join(parts, "+ ")
}

// Compare returns -1 if s is considered smaller than the other segment (less keys,
// keys and/or values smaller), 0 if equal and 1 otherwise.
func (s Segment) Compare(other Segment) int {
if len(s) < len(other) {
return -1
}
if len(s) > len(other) {
return 1
}
for i := 0; i < len(s); i++ {
cmp := s[i].Compare(other[i])
if cmp != 0 {
return cmp
}
}
return 0
}

func (s Segment) Len() int { return len(s) }
func (s Segment) Less(i, j int) bool { return s[i].Compare(s[j]) < 0 }
func (s Segment) Swap(i, j int) {
entry := s[i]
s[i] = s[j]
s[j] = entry
}

// SegmentEntry represents one topology key/value pair.
type SegmentEntry struct {
Key, Value string
}

func (se SegmentEntry) String() string {
return se.Key + ": " + se.Value
}

// Compare returns -1 if se is considered smaller than the other segment entry (key or value smaller),
// 0 if equal and 1 otherwise.
func (se SegmentEntry) Compare(other SegmentEntry) int {
cmp := strings.Compare(se.Key, other.Key)
if cmp != 0 {
return cmp
}
return strings.Compare(se.Value, other.Value)
}

// GetLabelSelector returns a LabelSelector with the key/value entries
// as label match criteria.
func (s Segment) GetLabelSelector() *metav1.LabelSelector {
return &metav1.LabelSelector{
MatchLabels: s.GetLabelMap(),
}
}

// GetLabelMap returns nil if the Segment itself is nil,
// otherwise a map with all key/value pairs.
func (s Segment) GetLabelMap() map[string]string {
if s == nil {
return nil
}
labels := map[string]string{}
for _, entry := range s {
labels[entry.Key] = entry.Value
}
return labels
}

// Informer keeps a list of discovered topology segments and can
// notify one or more clients when it discovers changes. Segments
// are identified by their address and guaranteed to be unique.
type Informer interface {
// AddCallback ensures that the function is called each time
// changes to the list of segments are detected. It also gets
// called immediately when adding the callback and there are
// already some known segments.
AddCallback(cb Callback)

// List returns all known segments, in no particular order.
List() []*Segment

// Run starts watching for changes.
Run(ctx context.Context)

// HasSynced returns true once all segments have been found.
HasSynced() bool
}

type Callback func(added []*Segment, removed []*Segment)
99 changes: 99 additions & 0 deletions pkg/owner/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package owner contains code for walking up the ownership chain,
// starting with an arbitrary object. RBAC rules must allow GET access
// to each object on the chain, at least including the starting
// object, more when walking up more than one level.
package owner

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Lookup walks up the ownership chain zero or more levels and returns an OwnerReference for the
// object. The object identified by name, namespace and type is the starting point and is
// returned when levels is zero. Only APIVersion, Kind, Name, and UID will be set.
// IsController is always true.
func Lookup(config *rest.Config, namespace, name string, gkv schema.GroupVersionKind, levels int) (*metav1.OwnerReference, error) {
c, err := client.New(config, client.Options{})
if err != nil {
return nil, fmt.Errorf("build client: %v", err)
}

return lookupRecursive(c, namespace, name, gkv.Group, gkv.Version, gkv.Kind, levels)
}

func lookupRecursive(c client.Client, namespace, name, group, version, kind string, levels int) (*metav1.OwnerReference, error) {
u := &unstructured.Unstructured{}
apiVersion := metav1.GroupVersion{Group: group, Version: version}.String()
u.SetAPIVersion(apiVersion)
u.SetKind(kind)

if err := c.Get(context.Background(), client.ObjectKey{
Namespace: namespace,
Name: name,
}, u); err != nil {
return nil, fmt.Errorf("get object: %v", err)
}

if levels == 0 {
isTrue := true
return &metav1.OwnerReference{
APIVersion: apiVersion,
Kind: kind,
Name: name,
UID: u.GetUID(),
Controller: &isTrue,
}, nil
}
owners := u.GetOwnerReferences()
for _, owner := range owners {
if owner.Controller != nil && *owner.Controller {
gv, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
return nil, fmt.Errorf("parse OwnerReference.APIVersion: %v", err)
}
// With this special case here we avoid one lookup and thus the need for
// RBAC GET permission for the parent. For example, when a Pod is controlled
// by a StatefulSet, we only need GET permission for Pods (for the c.Get above)
// but not for StatefulSets.
if levels == 1 {
isTrue := true
return &metav1.OwnerReference{
APIVersion: owner.APIVersion,
Kind: owner.Kind,
Name: owner.Name,
UID: owner.UID,
Controller: &isTrue,
}, nil
}

return lookupRecursive(c, namespace, owner.Name,
gv.Group, gv.Version, owner.Kind,
levels-1)
}
}
return nil, fmt.Errorf("%s/%s %q in namespace %q has no controlling owner, cannot unwind the ownership further",
apiVersion, kind, name, namespace)
}
194 changes: 194 additions & 0 deletions pkg/owner/owner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package owner

import (
"fmt"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var (
testNamespace = "test-namespace"
otherNamespace = "other-namespace"
statefulsetGkv = schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "StatefulSet",
}
deploymentGkv = schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Deployment",
}
replicasetGkv = schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "ReplicaSet",
}
podGkv = schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Pod",
}

pod = makeObject(testNamespace, "foo", podGkv, nil)
statefulset = makeObject(testNamespace, "foo", statefulsetGkv, nil)
statefulsetPod = makeObject(testNamespace, "foo", podGkv, &statefulset)
deployment = makeObject(testNamespace, "foo", deploymentGkv, nil)
replicaset = makeObject(testNamespace, "foo", replicasetGkv, &deployment)
otherReplicaset = makeObject(testNamespace, "bar", replicasetGkv, &deployment)
yetAnotherReplicaset = makeObject(otherNamespace, "foo", replicasetGkv, &deployment)
deploymentsetPod = makeObject(testNamespace, "foo", podGkv, &replicaset)
)

// TestNodeTopology checks that node labels are correctly transformed
// into topology segments.
func TestNodeTopology(t *testing.T) {
testcases := map[string]struct {
objects []runtime.Object
start unstructured.Unstructured
levels int
expectError bool
expectOwner unstructured.Unstructured
}{
"empty": {
start: pod,
expectError: true,
},
"pod-itself": {
objects: []runtime.Object{&pod},
start: pod,
levels: 0,
expectOwner: pod,
},
"no-parent": {
objects: []runtime.Object{&pod},
start: pod,
levels: 1,
expectError: true,
},
"parent": {
objects: []runtime.Object{&statefulsetPod},
start: statefulsetPod,
levels: 1,
// The object doesn't have to exist.
expectOwner: statefulset,
},
"missing-parent": {
objects: []runtime.Object{&deploymentsetPod},
start: deploymentsetPod,
levels: 2,
expectError: true,
},
"wrong-parent": {
objects: []runtime.Object{&deploymentsetPod, &otherReplicaset},
start: deploymentsetPod,
levels: 2,
expectError: true,
},
"another-wrong-parent": {
objects: []runtime.Object{&deploymentsetPod, &yetAnotherReplicaset},
start: deploymentsetPod,
levels: 2,
expectError: true,
},
"grandparent": {
objects: []runtime.Object{&deploymentsetPod, &replicaset},
start: deploymentsetPod,
levels: 2,
// The object doesn't have to exist.
expectOwner: deployment,
},
}

for name, tc := range testcases {
tc := tc
t.Run(name, func(t *testing.T) {
c := fake.NewFakeClient(tc.objects...)
gkv := tc.start.GroupVersionKind()
ownerRef, err := lookupRecursive(c,
tc.start.GetNamespace(),
tc.start.GetName(),
gkv.Group,
gkv.Version,
gkv.Kind,
tc.levels)
if err != nil && !tc.expectError {
t.Fatalf("unexpected error: %v", err)
}
if err == nil && tc.expectError {
t.Fatal("unexpected success")
}
if err == nil {
if ownerRef == nil {
t.Fatal("unexpected nil owner")
}
gkv := tc.expectOwner.GroupVersionKind()
apiVersion := metav1.GroupVersion{Group: gkv.Group, Version: gkv.Version}.String()
if ownerRef.APIVersion != apiVersion {
t.Errorf("expected APIVersion %q, got %q", apiVersion, ownerRef.APIVersion)
}
if ownerRef.Kind != gkv.Kind {
t.Errorf("expected Kind %q, got %q", gkv.Kind, ownerRef.Kind)
}
if ownerRef.Name != tc.expectOwner.GetName() {
t.Errorf("expected Name %q, got %q", tc.expectOwner.GetName(), ownerRef.Name)
}
if ownerRef.UID != tc.expectOwner.GetUID() {
t.Errorf("expected UID %q, got %q", tc.expectOwner.GetUID(), ownerRef.UID)
}
if ownerRef.Controller == nil || !*ownerRef.Controller {
t.Error("Controller field should true")
}
if ownerRef.BlockOwnerDeletion != nil && *ownerRef.BlockOwnerDeletion {
t.Error("BlockOwnerDeletion field should false")
}
}
})
}
}

var uidCounter int

func makeObject(namespace, name string, gkv schema.GroupVersionKind, owner *unstructured.Unstructured) unstructured.Unstructured {
u := unstructured.Unstructured{}
u.SetNamespace(namespace)
u.SetName(name)
u.SetGroupVersionKind(gkv)
uidCounter++
u.SetUID(types.UID(fmt.Sprintf("FAKE-UID-%d", uidCounter)))
if owner != nil {
isTrue := true
u.SetOwnerReferences([]metav1.OwnerReference{
{
APIVersion: owner.GetAPIVersion(),
Kind: owner.GetKind(),
Name: owner.GetName(),
UID: owner.GetUID(),
Controller: &isTrue,
},
})
}
return u
}
2 changes: 2 additions & 0 deletions vendor/github.com/hashicorp/golang-lru/go.mod
48 changes: 41 additions & 7 deletions vendor/github.com/hashicorp/golang-lru/lru.go
16 changes: 16 additions & 0 deletions vendor/github.com/hashicorp/golang-lru/simplelru/lru.go
8 changes: 2 additions & 6 deletions vendor/golang.org/x/net/http2/client_conn_pool.go
2 changes: 2 additions & 0 deletions vendor/golang.org/x/net/http2/flow.go
7 changes: 7 additions & 0 deletions vendor/golang.org/x/net/http2/hpack/huffman.go
7 changes: 7 additions & 0 deletions vendor/golang.org/x/net/http2/http2.go
8 changes: 5 additions & 3 deletions vendor/golang.org/x/net/http2/server.go
12 changes: 4 additions & 8 deletions vendor/golang.org/x/net/http2/transport.go
5 changes: 2 additions & 3 deletions vendor/golang.org/x/net/ipv4/header.go
1 change: 1 addition & 0 deletions vendor/gopkg.in/yaml.v2/apic.go
127 changes: 127 additions & 0 deletions vendor/k8s.io/apimachinery/pkg/util/rand/rand.go
119 changes: 119 additions & 0 deletions vendor/k8s.io/client-go/restmapper/category_expansion.go
338 changes: 338 additions & 0 deletions vendor/k8s.io/client-go/restmapper/discovery.go
172 changes: 172 additions & 0 deletions vendor/k8s.io/client-go/restmapper/shortcut.go
13 changes: 10 additions & 3 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ github.com/google/uuid
github.com/googleapis/gnostic/compiler
github.com/googleapis/gnostic/extensions
github.com/googleapis/gnostic/openapiv2
# github.com/hashicorp/golang-lru v0.5.1
# github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru
github.com/hashicorp/golang-lru/simplelru
# github.com/imdario/mergo v0.3.9
@@ -96,7 +96,7 @@ github.com/spf13/pflag
golang.org/x/crypto/ed25519
golang.org/x/crypto/ed25519/internal/edwards25519
golang.org/x/crypto/ssh/terminal
# golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e
# golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
golang.org/x/net/bpf
golang.org/x/net/context
golang.org/x/net/context/ctxhttp
@@ -212,7 +212,7 @@ google.golang.org/protobuf/types/known/timestamppb
google.golang.org/protobuf/types/known/wrapperspb
# gopkg.in/inf.v0 v0.9.1
gopkg.in/inf.v0
# gopkg.in/yaml.v2 v2.2.8
# gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v2
# k8s.io/api v0.19.0-rc.2 => k8s.io/api v0.19.0-rc.2
k8s.io/api/admissionregistration/v1
@@ -288,6 +288,7 @@ k8s.io/apimachinery/pkg/util/json
k8s.io/apimachinery/pkg/util/mergepatch
k8s.io/apimachinery/pkg/util/naming
k8s.io/apimachinery/pkg/util/net
k8s.io/apimachinery/pkg/util/rand
k8s.io/apimachinery/pkg/util/runtime
k8s.io/apimachinery/pkg/util/sets
k8s.io/apimachinery/pkg/util/strategicpatch
@@ -493,6 +494,7 @@ k8s.io/client-go/plugin/pkg/client/auth/exec
k8s.io/client-go/rest
k8s.io/client-go/rest/fake
k8s.io/client-go/rest/watch
k8s.io/client-go/restmapper
k8s.io/client-go/testing
k8s.io/client-go/tools/auth
k8s.io/client-go/tools/cache
@@ -538,6 +540,11 @@ k8s.io/kubernetes/pkg/apis/core/helper
k8s.io/utils/buffer
k8s.io/utils/integer
k8s.io/utils/trace
# sigs.k8s.io/controller-runtime v0.6.2
sigs.k8s.io/controller-runtime/pkg/client
sigs.k8s.io/controller-runtime/pkg/client/apiutil
sigs.k8s.io/controller-runtime/pkg/client/fake
sigs.k8s.io/controller-runtime/pkg/internal/objectutil
# sigs.k8s.io/sig-storage-lib-external-provisioner/v6 v6.1.0-rc1
sigs.k8s.io/sig-storage-lib-external-provisioner/v6/controller
sigs.k8s.io/sig-storage-lib-external-provisioner/v6/controller/metrics
201 changes: 201 additions & 0 deletions vendor/sigs.k8s.io/controller-runtime/LICENSE
Loading

0 comments on commit e909258

Please sign in to comment.