diff --git a/.gitignore b/.gitignore index 295e96b..c686aef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /bin /test/e2e/e2e.test /test/e2e/ginkgo +/cloudstack.ini +/.idea cloud-config diff --git a/README.md b/README.md index 1783145..7f48541 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ api-url = api-key = secret-key = ssl-no-verify = +project-id = ``` Create a secret named `cloudstack-secret` in namespace `kube-system`: @@ -51,6 +52,18 @@ kubectl create secret generic \ cloudstack-secret ``` +Set the correct hypervisor in the DaemonSet Env Vars: +``` + - name: NODE_HYPERVISOR + value: vmware +``` + +You can manually set the maximal attachable number of block volumes per node: +``` + - name: NODE_MAX_BLOCK_VOLUMES + value: "15" #Default value is 10 volumes per node +``` + If you have also deployed the [CloudStack Kubernetes Provider](https://github.com/apache/cloudstack-kubernetes-provider), you may use the same secret for both tools. @@ -88,11 +101,19 @@ disk offerings to Kubernetes storage classes. Example: -``` +```bash kubectl apply -f ./examples/k8s/pvc.yaml kubectl apply -f ./examples/k8s/pod.yaml ``` +#### Reusing volumes + +1. Patch PV `reclaimPolicy` with `kubectl patch pv my-pv-name -p '{"spec":{"persistentVolumeReclaimPolicy":"Retain"}}'` +2. Delete Old Pod and PVC +3. Patch PV `claimRef` with `kubectl patch pv my-pv-name -p '{"spec":{"claimRef": null}}'` +4. Create new Pod and PVC with existing claimName `.spec.claimRef.name = my-pv-name` + + ## Building To build the driver binary: diff --git a/clean-scsi-bus.sh b/clean-scsi-bus.sh new file mode 100755 index 0000000..466c475 --- /dev/null +++ b/clean-scsi-bus.sh @@ -0,0 +1,2 @@ +#!/bin/sh +echo 1 > /sys/class/scsi_device/0\:0\:$1\:0/device/delete \ No newline at end of file diff --git a/cmd/cloudstack-csi-driver/Dockerfile b/cmd/cloudstack-csi-driver/Dockerfile index a5eaccc..4d4f3f9 100644 --- a/cmd/cloudstack-csi-driver/Dockerfile +++ b/cmd/cloudstack-csi-driver/Dockerfile @@ -11,7 +11,13 @@ RUN apk add --no-cache \ # Provides mkfs.xfs xfsprogs \ # Provides blkid, also used by k8s.io/mount-utils - blkid + blkid \ + eudev \ + bash COPY ./bin/cloudstack-csi-driver /cloudstack-csi-driver +COPY rescan-scsi-bus.sh /usr/bin/ +RUN chmod +x /usr/bin/rescan-scsi-bus.sh +COPY clean-scsi-bus.sh /usr/bin/ +RUN chmod +x /usr/bin/clean-scsi-bus.sh ENTRYPOINT ["/cloudstack-csi-driver"] \ No newline at end of file diff --git a/deploy/k8s/node-daemonset.yaml b/deploy/k8s/node-daemonset.yaml index 73f4d97..4027745 100644 --- a/deploy/k8s/node-daemonset.yaml +++ b/deploy/k8s/node-daemonset.yaml @@ -2,101 +2,134 @@ apiVersion: apps/v1 kind: DaemonSet metadata: name: cloudstack-csi-node - namespace: kube-system spec: + revisionHistoryLimit: 10 selector: matchLabels: app.kubernetes.io/name: cloudstack-csi-node - updateStrategy: - type: RollingUpdate template: metadata: labels: app.kubernetes.io/name: cloudstack-csi-node app.kubernetes.io/part-of: cloudstack-csi-driver spec: - nodeSelector: - kubernetes.io/os: linux - tolerations: - - effect: NoExecute - operator: Exists - - effect: NoSchedule - operator: Exists - containers: - - name: cloudstack-csi-node - image: cloudstack-csi-driver - imagePullPolicy: Always - args: - - "-endpoint=$(CSI_ENDPOINT)" - - "-cloudstackconfig=/etc/cloudstack-csi-driver/cloud-config" - - "-nodeName=$(NODE_NAME)" - - "-debug" + - args: + - -endpoint=$(CSI_ENDPOINT) + - -cloudstackconfig=/etc/cloudstack-csi-driver/cloudstack.ini + - -nodeName=$(NODE_NAME) + - -debug env: - name: CSI_ENDPOINT value: unix:///csi/csi.sock + - name: NODE_HYPERVISOR + value: vmware + - name: NODE_MAX_BLOCK_VOLUMES + value: "10" - name: NODE_NAME valueFrom: fieldRef: + apiVersion: v1 fieldPath: spec.nodeName + image: cloudstack-csi-driver + imagePullPolicy: Always + name: cloudstack-csi-node + resources: {} securityContext: privileged: true + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File volumeMounts: - - name: plugin-dir - mountPath: /csi - - name: kubelet-dir - mountPath: /var/lib/kubelet - # needed so that any mounts setup inside this container are - # propagated back to the host machine. - mountPropagation: Bidirectional - - name: device-dir - mountPath: /dev - - name: cloud-init-dir - mountPath: /run/cloud-init/ - - name: cloudstack-conf - mountPath: /etc/cloudstack-csi-driver - - - name: node-driver-registrar - image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.0.1 - imagePullPolicy: IfNotPresent - args: - - "--csi-address=$(ADDRESS)" - - "--kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)" - - "--v=5" + - mountPath: /csi + name: plugin-dir + - mountPath: /var/lib/kubelet + mountPropagation: Bidirectional + name: kubelet-dir + - mountPath: /dev + name: device-dir + - mountPath: /run/cloud-init/ + name: cloud-init-dir + - mountPath: /etc/cloudstack-csi-driver + name: cloudstack-conf + - mountPath: /sys/class/scsi_host/ + name: sys-class-scsi-host-dir + - mountPath: /sys/class/scsi_device/ + name: sys-class-scsi-device-dir + - mountPath: /sys/devices + name: sys-devices + - args: + - --csi-address=$(ADDRESS) + - --kubelet-registration-path=$(DRIVER_REG_SOCK_PATH) + - --v=5 env: - name: ADDRESS value: /csi/csi.sock - name: DRIVER_REG_SOCK_PATH value: /var/lib/kubelet/plugins/csi.cloudstack.apache.org/csi.sock + image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.0.1 + imagePullPolicy: IfNotPresent + name: node-driver-registrar + resources: {} securityContext: privileged: true + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File volumeMounts: - - name: plugin-dir - mountPath: /csi - - name: registration-dir - mountPath: /registration - + - mountPath: /csi + name: plugin-dir + - mountPath: /registration + name: registration-dir + dnsPolicy: ClusterFirst + nodeSelector: + kubernetes.io/os: linux + restartPolicy: Always + schedulerName: default-scheduler + serviceAccount: cloudstack-csi-node + serviceAccountName: cloudstack-csi-node + terminationGracePeriodSeconds: 30 + tolerations: + - effect: NoExecute + operator: Exists + - effect: NoSchedule + operator: Exists volumes: - - name: plugin-dir - hostPath: + - hostPath: path: /var/lib/kubelet/plugins/csi.cloudstack.apache.org/ type: DirectoryOrCreate - - name: kubelet-dir - hostPath: + name: plugin-dir + - hostPath: path: /var/lib/kubelet type: Directory - - name: device-dir - hostPath: + name: kubelet-dir + - hostPath: path: /dev type: Directory - - name: registration-dir - hostPath: + name: device-dir + - hostPath: path: /var/lib/kubelet/plugins_registry type: Directory - - name: cloud-init-dir - hostPath: + name: registration-dir + - hostPath: path: /run/cloud-init/ type: Directory + name: cloud-init-dir + - hostPath: + path: /sys/class/scsi_host + type: Directory + name: sys-class-scsi-host-dir + - hostPath: + path: /sys/class/scsi_device + type: Directory + name: sys-class-scsi-device-dir + - hostPath: + path: /sys/devices + type: Directory + name: sys-devices - name: cloudstack-conf secret: - secretName: cloudstack-secret + defaultMode: 420 + secretName: csi-cloudstack-secret + updateStrategy: + rollingUpdate: + maxUnavailable: 1 + type: RollingUpdate diff --git a/go.mod b/go.mod index bb5e1a3..7163562 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/hashicorp/go-uuid v1.0.2 github.com/kubernetes-csi/csi-test/v4 v4.2.0 go.uber.org/zap v1.16.0 + golang.org/x/sys v0.0.0-20210510120138-977fb7262007 golang.org/x/text v0.3.6 google.golang.org/genproto v0.0.0-20210726200206-e7812ac95cc0 // indirect google.golang.org/grpc v1.39.0 diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index c376303..e6a6798 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -16,9 +16,13 @@ type Interface interface { ListZonesID(ctx context.Context) ([]string, error) + GetDomainID(ctx context.Context) (string, error) + GetProjectID() string + GetVolumeByID(ctx context.Context, volumeID string) (*Volume, error) GetVolumeByName(ctx context.Context, name string) (*Volume, error) - CreateVolume(ctx context.Context, diskOfferingID, zoneID, name string, sizeInGB int64) (string, error) + ListVolumesForVM(ctx context.Context, virtualMachineID, projectID string) ([]*Volume, error) + CreateVolume(ctx context.Context, diskOfferingID, projectID, domainID, zoneID, name string, sizeInGB int64) (string, error) DeleteVolume(ctx context.Context, id string) error AttachVolume(ctx context.Context, volumeID, vmID string) (string, error) DetachVolume(ctx context.Context, volumeID string) error @@ -37,6 +41,7 @@ type Volume struct { VirtualMachineID string DeviceID string + Hypervisor string } // VM represents a CloudStack Virtual Machine. @@ -54,10 +59,11 @@ var ( // client is the implementation of Interface. type client struct { *cloudstack.CloudStackClient + ProjectID string } // New creates a new cloud connector, given its configuration. func New(config *Config) Interface { csClient := cloudstack.NewAsyncClient(config.APIURL, config.APIKey, config.SecretKey, config.VerifySSL) - return &client{csClient} + return &client{csClient, config.ProjectID} } diff --git a/pkg/cloud/config.go b/pkg/cloud/config.go index 691be99..069de49 100644 --- a/pkg/cloud/config.go +++ b/pkg/cloud/config.go @@ -12,6 +12,7 @@ type Config struct { APIKey string SecretKey string VerifySSL bool + ProjectID string } // csConfig wraps the config for the CloudStack cloud provider. @@ -42,5 +43,6 @@ func ReadConfig(configFilePath string) (*Config, error) { APIKey: cfg.Global.APIKey, SecretKey: cfg.Global.SecretKey, VerifySSL: cfg.Global.SSLNoVerify, + ProjectID: cfg.Global.ProjectID, }, nil } diff --git a/pkg/cloud/fake/fake.go b/pkg/cloud/fake/fake.go index 14d20a4..6249885 100644 --- a/pkg/cloud/fake/fake.go +++ b/pkg/cloud/fake/fake.go @@ -73,7 +73,7 @@ func (f *fakeConnector) GetVolumeByName(ctx context.Context, name string) (*clou return nil, cloud.ErrNotFound } -func (f *fakeConnector) CreateVolume(ctx context.Context, diskOfferingID, zoneID, name string, sizeInGB int64) (string, error) { +func (f *fakeConnector) CreateVolume(ctx context.Context, diskOfferingID, projectID, domainID, zoneID, name string, sizeInGB int64) (string, error) { id, _ := uuid.GenerateUUID() vol := cloud.Volume{ ID: id, @@ -101,3 +101,11 @@ func (f *fakeConnector) AttachVolume(ctx context.Context, volumeID, vmID string) } func (f *fakeConnector) DetachVolume(ctx context.Context, volumeID string) error { return nil } + +func (f *fakeConnector) GetDomainID(ctx context.Context) (string, error) { + return "domain", nil +} + +func (f *fakeConnector) GetProjectID() string { + return "test" +} diff --git a/pkg/cloud/project.go b/pkg/cloud/project.go new file mode 100644 index 0000000..1304267 --- /dev/null +++ b/pkg/cloud/project.go @@ -0,0 +1,23 @@ +package cloud + +import ( + "context" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" +) + +func (c *client) GetDomainID(ctx context.Context) (string, error) { + p, _, err := c.Project.GetProjectByID(c.ProjectID) + ctxzap.Extract(ctx).Sugar().Infow("CloudStack API call", "command", "GetProjectByID", "params", map[string]string{ + "projectID": c.ProjectID, + }) + + if err != nil { + return "", err + } + + return p.Domainid, nil +} + +func (c *client) GetProjectID() string { + return c.ProjectID +} diff --git a/pkg/cloud/vms.go b/pkg/cloud/vms.go index 29124c3..7a08c92 100644 --- a/pkg/cloud/vms.go +++ b/pkg/cloud/vms.go @@ -23,6 +23,7 @@ func (c *client) GetVMByID(ctx context.Context, vmID string) (*VM, error) { return nil, ErrTooManyResults } vm := l.VirtualMachines[0] + return &VM{ ID: vm.Id, ZoneID: vm.Zoneid, diff --git a/pkg/cloud/volumes.go b/pkg/cloud/volumes.go index 394be39..335dda7 100644 --- a/pkg/cloud/volumes.go +++ b/pkg/cloud/volumes.go @@ -32,6 +32,7 @@ func (c *client) GetVolumeByID(ctx context.Context, volumeID string) (*Volume, e DiskOfferingID: vol.Diskofferingid, ZoneID: vol.Zoneid, VirtualMachineID: vol.Virtualmachineid, + Hypervisor: vol.Hypervisor, DeviceID: strconv.FormatInt(vol.Deviceid, 10), } return &v, nil @@ -62,16 +63,26 @@ func (c *client) GetVolumeByName(ctx context.Context, name string) (*Volume, err ZoneID: vol.Zoneid, VirtualMachineID: vol.Virtualmachineid, DeviceID: strconv.FormatInt(vol.Deviceid, 10), + Hypervisor: vol.Hypervisor, } + return &v, nil } -func (c *client) CreateVolume(ctx context.Context, diskOfferingID, zoneID, name string, sizeInGB int64) (string, error) { +func (c *client) CreateVolume(ctx context.Context, diskOfferingID, projectID, domainID, zoneID, name string, sizeInGB int64) (string, error) { p := c.Volume.NewCreateVolumeParams() p.SetDiskofferingid(diskOfferingID) p.SetZoneid(zoneID) p.SetName(name) p.SetSize(sizeInGB) + if domainID != "" { + p.SetDomainid(domainID) + } + + if projectID != "" { + p.SetProjectid(projectID) + } + ctxzap.Extract(ctx).Sugar().Infow("CloudStack API call", "command", "CreateVolume", "params", map[string]string{ "diskofferingid": diskOfferingID, "zoneid": zoneID, @@ -120,3 +131,35 @@ func (c *client) DetachVolume(ctx context.Context, volumeID string) error { _, err := c.Volume.DetachVolume(p) return err } + +func (c *client) ListVolumesForVM(ctx context.Context, virtualMachineID, projectID string) ([]*Volume, error) { + ctxzap.Extract(ctx).Sugar().Infow("CloudStack API call", "command", "ListVolumes", "params", map[string]string{ + "virtualmachineid": virtualMachineID, "projectid": projectID, + }) + p := c.Volume.NewListVolumesParams() + p.SetVirtualmachineid(virtualMachineID) + p.SetProjectid(projectID) + + l, err := c.Volume.ListVolumes(p) + if err != nil { + return nil, err + } + + volumes := make([]*Volume, len(l.Volumes)) + for i, _ := range l.Volumes { + vol := l.Volumes[i] + v := &Volume{ + ID: vol.Id, + Name: vol.Name, + Size: vol.Size, + DiskOfferingID: vol.Diskofferingid, + ZoneID: vol.Zoneid, + VirtualMachineID: vol.Virtualmachineid, + Hypervisor: vol.Hypervisor, + DeviceID: strconv.FormatInt(vol.Deviceid, 10), + } + volumes[i] = v + } + + return volumes, nil +} diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index fc189eb..4fd7650 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -3,7 +3,9 @@ package driver import ( "context" "fmt" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" "math/rand" + "sync" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -24,17 +26,18 @@ var onlyVolumeCapAccessMode = csi.VolumeCapability_AccessMode{ type controllerServer struct { csi.UnimplementedControllerServer connector cloud.Interface + locks map[string]*sync.Mutex } // NewControllerServer creates a new Controller gRPC server. func NewControllerServer(connector cloud.Interface) csi.ControllerServer { return &controllerServer{ connector: connector, + locks: make(map[string]*sync.Mutex), } } func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { - // Check arguments if req.GetName() == "" { @@ -115,7 +118,18 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol zoneID = t.ZoneID } - volID, err := cs.connector.CreateVolume(ctx, diskOfferingID, zoneID, name, sizeInGB) + projectID := cs.connector.GetProjectID() + domainID := "" + + if projectID != "" { + domainID, err = cs.connector.GetDomainID(ctx) + if err != nil { + ctxzap.Extract(ctx).Sugar().Debugf("could not get a domainID for project %s, create volume without DomainID", projectID) + //return nil, status.Errorf(codes.Internal, "Cannot create volume %s in project %s due error in GetDomainID: %v", name, projectID, err.Error()) + } + } + + volID, err := cs.connector.CreateVolume(ctx, diskOfferingID, projectID, domainID, zoneID, name, sizeInGB) if err != nil { return nil, status.Errorf(codes.Internal, "Cannot create volume %s: %v", name, err.Error()) } @@ -204,7 +218,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { // Check arguments - if req.GetVolumeId() == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } @@ -215,6 +228,15 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs } nodeID := req.GetNodeId() + //Ensure only one node is processing at same time + lock, ok := cs.locks[nodeID] + if !ok { + lock = &sync.Mutex{} + cs.locks[nodeID] = lock + } + lock.Lock() + defer lock.Unlock() + if req.GetReadonly() { return nil, status.Error(codes.InvalidArgument, "Readonly not possible") } @@ -236,7 +258,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs } if vol.VirtualMachineID != "" && vol.VirtualMachineID != nodeID { - return nil, status.Error(codes.AlreadyExists, "Volume already assigned") + return nil, status.Error(codes.AlreadyExists, "Volume already assigned to another node") } if _, err := cs.connector.GetVMByID(ctx, nodeID); err == cloud.ErrNotFound { @@ -246,8 +268,16 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs return nil, status.Errorf(codes.Internal, "Error %v", err) } + projectID := cs.connector.GetProjectID() + + vols, err := cs.connector.ListVolumesForVM(ctx, nodeID, projectID) + if err != nil { + return nil, status.Errorf(codes.Internal, "Error no volumes found for nodeID %s: %v", nodeID, err) + } + if vol.VirtualMachineID == nodeID { // volume already attached + ctxzap.Extract(ctx).Sugar().Debugf("volume %s is already attached on node %s", volumeID, nodeID) publishContext := map[string]string{ deviceIDContextKey: vol.DeviceID, @@ -255,11 +285,20 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs return &csi.ControllerPublishVolumeResponse{PublishContext: publishContext}, nil } + //Check max volumes + if len(vols) >= getMaxAllowedVolumes() { + return nil, status.Errorf(codes.ResourceExhausted, "Maximum allowed volumes (%d/%d) per node reached. Could not attach volume %s", len(vols), maxAllowedBlockVolumesPerNode, volumeID) + } + deviceID, err := cs.connector.AttachVolume(ctx, volumeID, nodeID) if err != nil { + ctxzap.Extract(ctx).Sugar().Errorf("volume %s failed node %s (may attached on node %s)", volumeID, nodeID, vol.VirtualMachineID) + return nil, status.Errorf(codes.Internal, "Cannot attach volume %s: %s", volumeID, err.Error()) } + ctxzap.Extract(ctx).Sugar().Debugf("volume %s attached successfully on node %s", volumeID, nodeID) + publishContext := map[string]string{ deviceIDContextKey: deviceID, } @@ -279,6 +318,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req * } nodeID := req.GetNodeId() + ctxzap.Extract(ctx).Sugar().Debugf("ControllerUnpublishVolume: try to unpublish volume %s on node %s", volumeID, nodeID) // Check volume if vol, err := cs.connector.GetVolumeByID(ctx, volumeID); err == cloud.ErrNotFound { return nil, status.Errorf(codes.NotFound, "Volume %v not found", volumeID) @@ -310,6 +350,8 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req * return nil, status.Errorf(codes.Internal, "Cannot detach volume %s: %s", volumeID, err.Error()) } + ctxzap.Extract(ctx).Sugar().Debugf("ControllerUnpublishVolume: volume %s detached successfully on node %s", volumeID, nodeID) + return &csi.ControllerUnpublishVolumeResponse{}, nil } diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 7fe72ad..7dee048 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "path/filepath" + "strconv" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" @@ -17,25 +19,47 @@ import ( const ( // default file system type to be used when it is not provided - defaultFsType = "ext4" + defaultFsType = "ext4" + maxAllowedBlockVolumesPerNode = 10 ) type nodeServer struct { csi.UnimplementedNodeServer - connector cloud.Interface - mounter mount.Interface - nodeName string + connector cloud.Interface + mounter mount.Interface + nodeName string + hypervisor string + maxAllowedBlockVolumesPerNode int } // NewNodeServer creates a new Node gRPC server. func NewNodeServer(connector cloud.Interface, mounter mount.Interface, nodeName string) csi.NodeServer { + hypervisor, ok := os.LookupEnv("NODE_HYPERVISOR") + if !ok { + panic("Environment variable NODE_HYPERVISOR must be set") + } + + if strings.ToLower(hypervisor) != "vmware" && strings.ToLower(hypervisor) == "kvm" { + panic("Environment variable NODE_HYPERVISOR must be 'vmware' or 'kvm'") + } + + maxVolumesStr, ok := os.LookupEnv("NODE_MAX_BLOCK_VOLUMES") + if ok { + _, err := strconv.Atoi(maxVolumesStr) + if err != nil { + panic("Environment variable NODE_MAX_BLOCK_VOLUMES must be of type integer: " + err.Error()) + } + } + if mounter == nil { mounter = mount.New() } return &nodeServer{ - connector: connector, - mounter: mounter, - nodeName: nodeName, + connector: connector, + mounter: mounter, + nodeName: nodeName, + hypervisor: hypervisor, + maxAllowedBlockVolumesPerNode: getMaxAllowedVolumes(), } } @@ -61,11 +85,14 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } + ctxzap.Extract(ctx).Sugar().Infof("mount stage volume on target: %s", target) // Now, find the device path + v, _ := ns.connector.GetVolumeByID(ctx, volumeID) + deviceID := req.PublishContext[deviceIDContextKey] - devicePath, err := ns.mounter.GetDevicePath(ctx, volumeID) + devicePath, err := ns.mounter.GetDevicePath(ctx, v.DeviceID, ns.hypervisor) if err != nil { return nil, status.Errorf(codes.Internal, "Cannot find device path for volume %s: %s", volumeID, err.Error()) } @@ -114,6 +141,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.Internal, err.Error()) } } + ctxzap.Extract(ctx).Sugar().Debugf("Staged volume device %s on %s on target %s successfully", volumeID, devicePath, target) return &csi.NodeStageVolumeResponse{}, nil } @@ -167,6 +195,15 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Errorf(codes.Internal, "Could not unmount target %q: %v", target, err) } + ctxzap.Extract(ctx).Sugar().Debugf("NodeUnstageVolume: unmounted %s on target %s", dev, target) + + v, err := ns.connector.GetVolumeByID(ctx, volumeID) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not find volume %s: %v", volumeID, err) + } + + ns.mounter.CleanScsi(ctx, v.DeviceID, ns.hypervisor) + return &csi.NodeUnstageVolumeResponse{}, nil } @@ -192,6 +229,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if req.GetStagingTargetPath() == "" { return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") } + v, err := ns.connector.GetVolumeByID(ctx, volumeID) + if err != nil { + return nil, status.Error(codes.InvalidArgument, "No volume found") + } readOnly := req.GetReadonly() options := []string{"bind"} @@ -237,12 +278,13 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err := ns.mounter.Mount(source, targetPath, fsType, options); err != nil { return nil, status.Errorf(codes.Internal, "failed to mount %s at %s: %s", source, targetPath, err.Error()) } + ctxzap.Extract(ctx).Sugar().Debugf("mount volume %s from source %s on target %s ", volumeID, source, targetPath) } if req.GetVolumeCapability().GetBlock() != nil { volumeID := req.GetVolumeId() - devicePath, err := ns.mounter.GetDevicePath(ctx, volumeID) + devicePath, err := ns.mounter.GetDevicePath(ctx, v.DeviceID, ns.hypervisor) if err != nil { return nil, status.Errorf(codes.Internal, "Cannot find device path for volume %s: %s", volumeID, err.Error()) } @@ -269,12 +311,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err := ns.mounter.Mount(devicePath, targetPath, "", options); err != nil { return nil, status.Errorf(codes.Internal, "failed to mount %s at %s: %s", devicePath, targetPath, err.Error()) } + ctxzap.Extract(ctx).Sugar().Infow("### mount volume on devicePath: " + devicePath + " and targetPath: " + targetPath) } return &csi.NodePublishVolumeResponse{}, nil } func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + if req.GetVolumeId() == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } @@ -284,14 +328,17 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu targetPath := req.GetTargetPath() volumeID := req.GetVolumeId() - if _, err := ns.connector.GetVolumeByID(ctx, volumeID); err == cloud.ErrNotFound { + ctxzap.Extract(ctx).Sugar().Debugf("NodeUnpublishVolume: unpublish volume %s on node %s", volumeID, targetPath) + v, err := ns.connector.GetVolumeByID(ctx, volumeID) + if err == cloud.ErrNotFound { return nil, status.Errorf(codes.NotFound, "Volume %v not found", volumeID) } else if err != nil { // Error with CloudStack return nil, status.Errorf(codes.Internal, "Error %v", err) } - err := ns.mounter.Unmount(targetPath) + ctxzap.Extract(ctx).Sugar().Debugw("node unpublish (call unmount) volume", "id", volumeID, "targetPath", targetPath) + err = ns.mounter.Unmount(targetPath) if err != nil { return nil, status.Errorf(codes.Internal, "Unmount of targetpath %s failed with error %v", targetPath, err) } @@ -299,6 +346,10 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu if err != nil && !os.IsNotExist(err) { return nil, status.Errorf(codes.Internal, "Deleting %s failed with error %v", targetPath, err) } + ctxzap.Extract(ctx).Sugar().Debugf("NodeUnpublishVolume: successfully unpublish volume %s on node %s", volumeID, targetPath) + + ns.mounter.CleanScsi(ctx, v.DeviceID, ns.hypervisor) + return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -322,6 +373,55 @@ func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque return &csi.NodeGetInfoResponse{ NodeId: vm.ID, AccessibleTopology: topology.ToCSI(), + MaxVolumesPerNode: int64(getMaxAllowedVolumes()), + }, nil +} + +func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + if req.GetVolumeId() == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + volumeID := req.GetVolumeId() + + volumePath := req.VolumePath + if volumePath == "" { + return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats Volume Path must be provided") + } + + ctxzap.Extract(ctx).Sugar().Debugf("NodeGetVolumeStats: for volume %s", volumeID) + _, err := ns.connector.GetVolumeByID(ctx, volumeID) + if err == cloud.ErrNotFound { + return nil, status.Errorf(codes.NotFound, "Volume %v not found", volumeID) + } else if err != nil { + // Error with CloudStack + return nil, status.Errorf(codes.Internal, "Error %v", err) + } + + _, err = ns.mounter.IsBlockDevice(volumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to determine if %q is block device: %s", volumePath, err) + } + + stats, err := ns.mounter.GetStatistics(volumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to retrieve capacity statistics for volume path %q: %s", volumePath, err) + } + + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + &csi.VolumeUsage{ + Available: stats.AvailableBytes, + Total: stats.TotalBytes, + Used: stats.UsedBytes, + Unit: csi.VolumeUsage_BYTES, + }, + &csi.VolumeUsage{ + Available: stats.AvailableInodes, + Total: stats.TotalInodes, + Used: stats.UsedInodes, + Unit: csi.VolumeUsage_INODES, + }, + }, }, nil } @@ -335,6 +435,23 @@ func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS}}, + }, }, }, nil } + +func getMaxAllowedVolumes() int { + maxVolumes := maxAllowedBlockVolumesPerNode + maxVolumesStr, ok := os.LookupEnv("NODE_MAX_BLOCK_VOLUMES") + if ok { + max, err := strconv.Atoi(maxVolumesStr) + if err != nil { + maxVolumes = max + } + } + return maxVolumes +} diff --git a/pkg/mount/fake.go b/pkg/mount/fake.go index 985b920..2e9dd69 100644 --- a/pkg/mount/fake.go +++ b/pkg/mount/fake.go @@ -14,6 +14,14 @@ type fakeMounter struct { utilsexec.Interface } +func (m *fakeMounter) GetStatistics(volumePath string) (volumeStatistics, error) { + return volumeStatistics{}, nil +} + +func (m *fakeMounter) IsBlockDevice(devicePath string) (bool, error) { + return true, nil +} + // NewFake creates an fake implementation of the // mount.Interface, to be used in tests. func NewFake() Interface { @@ -26,7 +34,7 @@ func NewFake() Interface { } } -func (m *fakeMounter) GetDevicePath(ctx context.Context, volumeID string) (string, error) { +func (m *fakeMounter) GetDevicePath(ctx context.Context, volumeID string, hypervisor string) (string, error) { return "/dev/sdb", nil } @@ -51,3 +59,7 @@ func (*fakeMounter) MakeDir(pathname string) error { func (*fakeMounter) MakeFile(pathname string) error { return nil } + +func (m *fakeMounter) CleanScsi(ctx context.Context, deviceID, hypervisor string) { + //Do nothing +} diff --git a/pkg/mount/mount.go b/pkg/mount/mount.go index c455028..ee83872 100644 --- a/pkg/mount/mount.go +++ b/pkg/mount/mount.go @@ -5,20 +5,20 @@ package mount import ( "context" "fmt" - "io/ioutil" - "os" - "path/filepath" - "strings" - "time" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/mount-utils" "k8s.io/utils/exec" + "os" + "path/filepath" + "strconv" + "strings" + "time" ) const ( - diskIDPath = "/dev/disk/by-id" + diskIDPath = "/dev/disk/by-path" ) // Interface defines the set of methods to allow for @@ -29,7 +29,12 @@ type Interface interface { FormatAndMount(source string, target string, fstype string, options []string) error - GetDevicePath(ctx context.Context, volumeID string) (string, error) + CleanScsi(ctx context.Context, deviceID, hypervisor string) + + GetStatistics(volumePath string) (volumeStatistics, error) + IsBlockDevice(devicePath string) (bool, error) + + GetDevicePath(ctx context.Context, volumeID string, hypervisor string) (string, error) GetDeviceName(mountPath string) (string, int, error) ExistsPath(filename string) (bool, error) MakeDir(pathname string) error @@ -41,6 +46,11 @@ type mounter struct { exec.Interface } +type volumeStatistics struct { + AvailableBytes, TotalBytes, UsedBytes int64 + AvailableInodes, TotalInodes, UsedInodes int64 +} + // New creates an implementation of the mount.Interface. func New() Interface { return &mounter{ @@ -52,7 +62,13 @@ func New() Interface { } } -func (m *mounter) GetDevicePath(ctx context.Context, volumeID string) (string, error) { +func (m *mounter) GetDevicePath(ctx context.Context, deviceID string, hypervisor string) (string, error) { + + deviceID = CorrectDeviceId(ctx, deviceID, hypervisor) + + deviceID = fmt.Sprintf("pci-0000:00:10.0-scsi-0:0:%s:0", deviceID) + ctxzap.Extract(ctx).Sugar().Debugf("device path: %s/%s", diskIDPath, deviceID) + backoff := wait.Backoff{ Duration: 1 * time.Second, Factor: 1.1, @@ -61,65 +77,87 @@ func (m *mounter) GetDevicePath(ctx context.Context, volumeID string) (string, e var devicePath string err := wait.ExponentialBackoffWithContext(ctx, backoff, func() (bool, error) { - path, err := m.getDevicePathBySerialID(volumeID) + path, err := m.getDevicePathBySerialID(deviceID) if err != nil { return false, err } if path != "" { devicePath = path + ctxzap.Extract(ctx).Sugar().Debugf("device path found: %s", path) return true, nil } - m.probeVolume(ctx) + m.rescanScsi(ctx) return false, nil }) if err == wait.ErrWaitTimeout { - return "", fmt.Errorf("failed to find device for the volumeID: %q within the alloted time", volumeID) + return "", fmt.Errorf("Failed to find device for the deviceID: %q within the alloted time", deviceID) } else if devicePath == "" { - return "", fmt.Errorf("device path was empty for volumeID: %q", volumeID) + return "", fmt.Errorf("Device path was empty for deviceID: %q", deviceID) } return devicePath, nil } -func (m *mounter) getDevicePathBySerialID(volumeID string) (string, error) { - sourcePathPrefixes := []string{"virtio-", "scsi-", "scsi-0QEMU_QEMU_HARDDISK_"} - serial := diskUUIDToSerial(volumeID) - for _, prefix := range sourcePathPrefixes { - source := filepath.Join(diskIDPath, prefix+serial) - _, err := os.Stat(source) - if err == nil { - return source, nil - } - if !os.IsNotExist(err) { - return "", err +//Corrects the device id on the node. the scsi id may not match th id which is set from te cloudstack controller +//1. ClousStack assumes that SCSI ID 3 is always the CD-ROM and is ignoring this id. +//https://github.com/apache/cloudstack/blob/98d42750cc21dfce5a8dd6d1880e09a621e0152e/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java#L3442 +//2. SCSI ID 7 is reserved for the Virtual SCSI Controller +//https://docs.vmware.com/en/VMware-vSphere/6.0/com.vmware.vsphere.hostclient.doc/GUID-5872D173-A076-42FE-8D0B-9DB0EB0E7362_copy.html +func CorrectDeviceId(ctx context.Context, deviceID, hypervisor string) string { + ctxzap.Extract(ctx).Sugar().Debugf("device id: '%s' (Hypervisor: %s)", deviceID, hypervisor) + + if strings.ToLower(hypervisor) == "vmware" { + ctxzap.Extract(ctx).Sugar().Warnf("volume hypervisor is VMWare, try to correct SCSI ID between ID 3-7") + idInt, _ := strconv.Atoi(deviceID) + if idInt > 3 && idInt <= 7 { + idInt-- + deviceID = fmt.Sprintf("%d", idInt) + ctxzap.Extract(ctx).Sugar().Warnf("new device id: %s", deviceID) } } + + return deviceID +} + +func (m *mounter) getDevicePathBySerialID(volumeID string) (string, error) { + source := filepath.Join(diskIDPath, volumeID) + _, err := os.Stat(source) + if err == nil { + return source, nil + } + if !os.IsNotExist(err) { + return "", err + } return "", nil } -func (m *mounter) probeVolume(ctx context.Context) { +func (m *mounter) rescanScsi(ctx context.Context) { log := ctxzap.Extract(ctx).Sugar() log.Debug("Scaning SCSI host...") - scsiPath := "/sys/class/scsi_host/" - if dirs, err := ioutil.ReadDir(scsiPath); err == nil { - for _, f := range dirs { - name := scsiPath + f.Name() + "/scan" - data := []byte("- - -") - if err = ioutil.WriteFile(name, data, 0666); err != nil { - log.Warnf("Failed to rescan scsi host %s", name) - } - } - } else { - log.Warnf("Failed to read %s, err %v", scsiPath, err) + args := []string{} + cmd := m.Exec.Command("rescan-scsi-bus.sh", args...) + _, err := cmd.CombinedOutput() + if err != nil { + log.Warnf("Error running rescan-scsi-bus.sh: %v\n", err) } +} - args := []string{"trigger"} - cmd := m.Exec.Command("udevadm", args...) - _, err := cmd.CombinedOutput() +func (m *mounter) CleanScsi(ctx context.Context, deviceID, hypervisor string) { + log := ctxzap.Extract(ctx).Sugar() + + deviceID = CorrectDeviceId(ctx, deviceID, hypervisor) + + devicePath := fmt.Sprintf("/sys/class/scsi_device/0:0:%s:0/device/delete", deviceID) + log.Debugf("removing SCSI devices on %s", devicePath) + args := []string{deviceID} + cmd := m.Exec.Command("clean-scsi-bus.sh", args...) + out, err := cmd.CombinedOutput() if err != nil { - log.Warnf("Error running udevadm trigger %v\n", err) + log.Warnf("Error running echo 1 > %s: %v\n", devicePath, err) } + + fmt.Println(string(out)) } func (m *mounter) GetDeviceName(mountPath string) (string, int, error) { @@ -170,3 +208,57 @@ func (*mounter) MakeFile(pathname string) error { } return nil } + +//Copy Pasta from https://github.com/digitalocean/csi-digitalocean/blob/db266f4044178a96c5aa9e2420efae8723af75f4/driver/mounter.go +func (m *mounter) GetStatistics(volumePath string) (volumeStatistics, error) { + isBlock, err := m.IsBlockDevice(volumePath) + if err != nil { + return volumeStatistics{}, fmt.Errorf("failed to determine if volume %s is block device: %v", volumePath, err) + } + + if isBlock { + // See http://man7.org/linux/man-pages/man8/blockdev.8.html for details + output, err := m.Exec.Command("blockdev", "getsize64", volumePath).CombinedOutput() + if err != nil { + return volumeStatistics{}, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %v", volumePath, string(output), err) + } + strOut := strings.TrimSpace(string(output)) + gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64) + if err != nil { + return volumeStatistics{}, fmt.Errorf("failed to parse size %s into int", strOut) + } + + return volumeStatistics{ + TotalBytes: gotSizeBytes, + }, nil + } + + var statfs unix.Statfs_t + // See http://man7.org/linux/man-pages/man2/statfs.2.html for details. + err = unix.Statfs(volumePath, &statfs) + if err != nil { + return volumeStatistics{}, err + } + + volStats := volumeStatistics{ + AvailableBytes: int64(statfs.Bavail) * int64(statfs.Bsize), + TotalBytes: int64(statfs.Blocks) * int64(statfs.Bsize), + UsedBytes: (int64(statfs.Blocks) - int64(statfs.Bfree)) * int64(statfs.Bsize), + + AvailableInodes: int64(statfs.Ffree), + TotalInodes: int64(statfs.Files), + UsedInodes: int64(statfs.Files) - int64(statfs.Ffree), + } + + return volStats, nil +} + +func (m *mounter) IsBlockDevice(devicePath string) (bool, error) { + var stat unix.Stat_t + err := unix.Stat(devicePath, &stat) + if err != nil { + return false, err + } + + return (stat.Mode & unix.S_IFMT) == unix.S_IFBLK, nil +} diff --git a/rescan-scsi-bus.sh b/rescan-scsi-bus.sh new file mode 100755 index 0000000..c1285e4 --- /dev/null +++ b/rescan-scsi-bus.sh @@ -0,0 +1,4 @@ +#!/bin/sh +for host in /sys/class/scsi_host/host*/scan ; do + echo "- - -" > $host + done