diff --git a/cmd/cloudstack-csi-sc-syncer/README.md b/cmd/cloudstack-csi-sc-syncer/README.md index 3c4cbba..d628467 100644 --- a/cmd/cloudstack-csi-sc-syncer/README.md +++ b/cmd/cloudstack-csi-sc-syncer/README.md @@ -89,14 +89,26 @@ spec: args: - "-cloudstackconfig=/etc/cloudstack-csi-driver/cloud-config" - "-kubeconfig=-" + - "-nodeName=$(NODE_NAME)" + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName volumeMounts: - name: cloudstack-conf mountPath: /etc/cloudstack-csi-driver + - name: cloud-init-dir + mountPath: /run/cloud-init/ restartPolicy: Never volumes: - name: cloudstack-conf secret: secretName: cloudstack-secret + - name: cloud-init-dir + hostPath: + path: /run/cloud-init/ + type: Directory E0F ``` diff --git a/cmd/cloudstack-csi-sc-syncer/main.go b/cmd/cloudstack-csi-sc-syncer/main.go index cf3edfc..197a96d 100644 --- a/cmd/cloudstack-csi-sc-syncer/main.go +++ b/cmd/cloudstack-csi-sc-syncer/main.go @@ -16,12 +16,14 @@ import ( const agent = "cloudstack-csi-sc-syncer" var ( - cloudstackconfig = flag.String("cloudstackconfig", "./cloud-config", "CloudStack configuration file") - kubeconfig = flag.String("kubeconfig", path.Join(os.Getenv("HOME"), ".kube/config"), "Kubernetes configuration file. Use \"-\" to use in-cluster configuration.") - label = flag.String("label", "app.kubernetes.io/managed-by="+agent, "") - namePrefix = flag.String("namePrefix", "cloudstack-", "") - delete = flag.Bool("delete", false, "Delete") - showVersion = flag.Bool("version", false, "Show version") + cloudstackconfig = flag.String("cloudstackconfig", "./cloud-config", "CloudStack configuration file") + kubeconfig = flag.String("kubeconfig", path.Join(os.Getenv("HOME"), ".kube/config"), "Kubernetes configuration file. Use \"-\" to use in-cluster configuration.") + label = flag.String("label", "app.kubernetes.io/managed-by="+agent, "") + nodeName = flag.String("nodeName", "", "Node name") + addAllowedTopology = flag.Bool("addAllowedTopology", false, "Add allowed topology to storageclass") + namePrefix = flag.String("namePrefix", "cloudstack-", "") + delete = flag.Bool("delete", false, "Delete") + showVersion = flag.Bool("version", false, "Show version") // Version is set by the build process version = "" @@ -37,12 +39,14 @@ func main() { } s, err := syncer.New(syncer.Config{ - Agent: agent, - CloudStackConfig: *cloudstackconfig, - KubeConfig: *kubeconfig, - Label: *label, - NamePrefix: *namePrefix, - Delete: *delete, + Agent: agent, + CloudStackConfig: *cloudstackconfig, + KubeConfig: *kubeconfig, + Label: *label, + NodeName: *nodeName, + AddAllowedTopology: *addAllowedTopology, + NamePrefix: *namePrefix, + Delete: *delete, }) if err != nil { log.Fatalf("Error: %v", err) diff --git a/pkg/syncer/run.go b/pkg/syncer/run.go index d8262d6..c1e60b8 100644 --- a/pkg/syncer/run.go +++ b/pkg/syncer/run.go @@ -20,6 +20,7 @@ var ( volBindingMode = storagev1.VolumeBindingWaitForFirstConsumer reclaimPolicy = corev1.PersistentVolumeReclaimDelete allowVolumeExpansion = false + zoneID string ) func (s syncer) Run(ctx context.Context) error { @@ -91,6 +92,22 @@ func (s syncer) Run(ctx context.Context) error { return combinedError(errs) } +func getAllowedTopologies() []corev1.TopologySelectorTerm { + if zoneID != "" { + return []corev1.TopologySelectorTerm{ + { + MatchLabelExpressions: []corev1.TopologySelectorLabelRequirement{ + { + Key: "topology." + driver.DriverName + "/zone", + Values: []string{zoneID}, + }, + }, + }, + } + } + return nil +} + func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffering) (string, error) { offeringName := offering.Name custom := offering.Iscustomized @@ -107,6 +124,8 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer } log.Printf("Storage class name: %s", name) + zoneID = s.getZoneID(ctx) + sc, err := s.k8sClient.StorageV1().StorageClasses().Get(ctx, name, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { @@ -128,6 +147,14 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer driver.DiskOfferingKey: offering.Id, }, } + + //Add AllowedTopologies if the addAllowedTopology flag is true + if s.addAllowedTopology { + if getAllowedTopologies() != nil { + newSc.AllowedTopologies = getAllowedTopologies() + } + } + _, err = s.k8sClient.StorageV1().StorageClasses().Create(ctx, newSc, metav1.CreateOptions{}) return name, err } @@ -136,7 +163,7 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer // Storage class already exists - err = checkStorageClass(sc, offering.Id) + err = s.checkStorageClass(sc, offering.Id) if err != nil { // Updates to provisioner, reclaimpolicy, volumeBindingMode and parameters are forbidden log.Printf("Storage class %s exists but it not compatible.", name) @@ -159,7 +186,32 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer return name, nil } -func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string) error { +func getExistingZoneID(terms []corev1.TopologySelectorTerm) string { + prefix := "topology." + driver.DriverName + "/zone" + for _, term := range terms { + for _, exp := range term.MatchLabelExpressions { + if exp.Key == prefix { + if len(exp.Values) > 0 { + return exp.Values[0] + } + } + } + } + return "" +} + +// get ZoneID of the node where syncer is running +func (s syncer) getZoneID(ctx context.Context) string { + vm, err := s.csConnector.GetNodeInfo(ctx, s.nodeName) + if err != nil { + log.Printf("GetNodeinfo failed: %s", err.Error()) + } else { + return vm.ZoneID + } + return "" +} + +func (s syncer) checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string) error { errs := make([]error, 0) diskOfferingID, ok := sc.Parameters[driver.DiskOfferingKey] if !ok { @@ -177,6 +229,15 @@ func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string) er if sc.AllowVolumeExpansion == nil || *sc.AllowVolumeExpansion != allowVolumeExpansion { errs = append(errs, errors.New("wrong AllowVolumeExpansion")) } + if s.addAllowedTopology { + if sc.AllowedTopologies == nil { + errs = append(errs, errors.New("allowedtopology flag is true but missing allowedtopologies")) + } else if sc.AllowedTopologies != nil { + if zoneID != getExistingZoneID(sc.AllowedTopologies) { + errs = append(errs, errors.New("allowedtopology flag is true but zoneID is not the same with desired zoneID: "+zoneID)) + } + } + } if len(errs) > 0 { return combinedError(errs) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index df936e9..1ef3515 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -20,12 +20,14 @@ import ( // Config holds the syncer tool configuration. type Config struct { - Agent string - CloudStackConfig string - KubeConfig string - Label string - NamePrefix string - Delete bool + Agent string + CloudStackConfig string + KubeConfig string + Label string + NodeName string + AddAllowedTopology bool + NamePrefix string + Delete bool } // Syncer has a function Run which synchronizes CloudStack @@ -36,11 +38,14 @@ type Syncer interface { // syncer is Syncer implementation. type syncer struct { - k8sClient *kubernetes.Clientset - csClient *cloudstack.CloudStackClient - labelsSet labels.Set - namePrefix string - delete bool + k8sClient *kubernetes.Clientset + csClient *cloudstack.CloudStackClient + csConnector cloud.Interface + nodeName string + addAllowedTopology bool + labelsSet labels.Set + namePrefix string + delete bool } func createK8sClient(kubeconfig, agent string) (*kubernetes.Clientset, error) { @@ -70,6 +75,17 @@ func createCloudStackClient(cloudstackconfig string) (*cloudstack.CloudStackClie return client, nil } +func createCSConnector(cloudstackconfig string) (cloud.Interface, error) { + config, err := cloud.ReadConfig(cloudstackconfig) + if err != nil { + return nil, err + } + + csConnector := cloud.New(config) + + return csConnector, nil +} + func createLabelsSet(label string) labels.Set { m := make(map[string]string) if len(label) > 0 { @@ -95,11 +111,19 @@ func New(config Config) (Syncer, error) { return nil, fmt.Errorf("cannot create CloudStack client: %w", err) } + csConnector, err := createCSConnector(config.CloudStackConfig) + if err != nil { + return nil, fmt.Errorf("cannot create CS connector interface: %w", err) + } + return syncer{ - k8sClient: k8sClient, - csClient: csClient, - labelsSet: createLabelsSet(config.Label), - namePrefix: config.NamePrefix, - delete: config.Delete, + k8sClient: k8sClient, + csClient: csClient, + csConnector: csConnector, + nodeName: config.NodeName, + addAllowedTopology: config.AddAllowedTopology, + labelsSet: createLabelsSet(config.Label), + namePrefix: config.NamePrefix, + delete: config.Delete, }, nil }