Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added storageclass allowedTopologies support #18

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cmd/cloudstack-csi-sc-syncer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,26 @@ spec:
args:
- "-cloudstackconfig=/etc/cloudstack-csi-driver/cloud-config"
- "-kubeconfig=-"
- "-nodeName=$(NODE_NAME)"
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: cloudstack-conf
mountPath: /etc/cloudstack-csi-driver
- name: cloud-init-dir
mountPath: /run/cloud-init/
restartPolicy: Never
volumes:
- name: cloudstack-conf
secret:
secretName: cloudstack-secret
- name: cloud-init-dir
hostPath:
path: /run/cloud-init/
type: Directory
E0F
```

Expand Down
28 changes: 16 additions & 12 deletions cmd/cloudstack-csi-sc-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
const agent = "cloudstack-csi-sc-syncer"

var (
cloudstackconfig = flag.String("cloudstackconfig", "./cloud-config", "CloudStack configuration file")
kubeconfig = flag.String("kubeconfig", path.Join(os.Getenv("HOME"), ".kube/config"), "Kubernetes configuration file. Use \"-\" to use in-cluster configuration.")
label = flag.String("label", "app.kubernetes.io/managed-by="+agent, "")
namePrefix = flag.String("namePrefix", "cloudstack-", "")
delete = flag.Bool("delete", false, "Delete")
showVersion = flag.Bool("version", false, "Show version")
cloudstackconfig = flag.String("cloudstackconfig", "./cloud-config", "CloudStack configuration file")
kubeconfig = flag.String("kubeconfig", path.Join(os.Getenv("HOME"), ".kube/config"), "Kubernetes configuration file. Use \"-\" to use in-cluster configuration.")
label = flag.String("label", "app.kubernetes.io/managed-by="+agent, "")
nodeName = flag.String("nodeName", "", "Node name")
addAllowedTopology = flag.Bool("addAllowedTopology", false, "Add allowed topology to storageclass")
namePrefix = flag.String("namePrefix", "cloudstack-", "")
delete = flag.Bool("delete", false, "Delete")
showVersion = flag.Bool("version", false, "Show version")

// Version is set by the build process
version = ""
Expand All @@ -37,12 +39,14 @@ func main() {
}

s, err := syncer.New(syncer.Config{
Agent: agent,
CloudStackConfig: *cloudstackconfig,
KubeConfig: *kubeconfig,
Label: *label,
NamePrefix: *namePrefix,
Delete: *delete,
Agent: agent,
CloudStackConfig: *cloudstackconfig,
KubeConfig: *kubeconfig,
Label: *label,
NodeName: *nodeName,
AddAllowedTopology: *addAllowedTopology,
NamePrefix: *namePrefix,
Delete: *delete,
})
if err != nil {
log.Fatalf("Error: %v", err)
Expand Down
65 changes: 63 additions & 2 deletions pkg/syncer/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
volBindingMode = storagev1.VolumeBindingWaitForFirstConsumer
reclaimPolicy = corev1.PersistentVolumeReclaimDelete
allowVolumeExpansion = false
zoneID string
)

func (s syncer) Run(ctx context.Context) error {
Expand Down Expand Up @@ -91,6 +92,22 @@ func (s syncer) Run(ctx context.Context) error {
return combinedError(errs)
}

func getAllowedTopologies() []corev1.TopologySelectorTerm {
if zoneID != "" {
return []corev1.TopologySelectorTerm{
{
MatchLabelExpressions: []corev1.TopologySelectorLabelRequirement{
{
Key: "topology." + driver.DriverName + "/zone",
Values: []string{zoneID},
},
},
},
}
}
return nil
}

func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffering) (string, error) {
offeringName := offering.Name
custom := offering.Iscustomized
Expand All @@ -107,6 +124,8 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer
}
log.Printf("Storage class name: %s", name)

zoneID = s.getZoneID(ctx)

sc, err := s.k8sClient.StorageV1().StorageClasses().Get(ctx, name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -128,6 +147,14 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer
driver.DiskOfferingKey: offering.Id,
},
}

//Add AllowedTopologies if the addAllowedTopology flag is true
if s.addAllowedTopology {
if getAllowedTopologies() != nil {
newSc.AllowedTopologies = getAllowedTopologies()
}
}

_, err = s.k8sClient.StorageV1().StorageClasses().Create(ctx, newSc, metav1.CreateOptions{})
return name, err
}
Expand All @@ -136,7 +163,7 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer

// Storage class already exists

err = checkStorageClass(sc, offering.Id)
err = s.checkStorageClass(sc, offering.Id)
if err != nil {
// Updates to provisioner, reclaimpolicy, volumeBindingMode and parameters are forbidden
log.Printf("Storage class %s exists but it not compatible.", name)
Expand All @@ -159,7 +186,32 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer
return name, nil
}

func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string) error {
func getExistingZoneID(terms []corev1.TopologySelectorTerm) string {
prefix := "topology." + driver.DriverName + "/zone"
for _, term := range terms {
for _, exp := range term.MatchLabelExpressions {
if exp.Key == prefix {
if len(exp.Values) > 0 {
return exp.Values[0]
}
}
}
}
return ""
}

// get ZoneID of the node where syncer is running
func (s syncer) getZoneID(ctx context.Context) string {
vm, err := s.csConnector.GetNodeInfo(ctx, s.nodeName)
if err != nil {
log.Printf("GetNodeinfo failed: %s", err.Error())
} else {
return vm.ZoneID
}
return ""
}

func (s syncer) checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string) error {
errs := make([]error, 0)
diskOfferingID, ok := sc.Parameters[driver.DiskOfferingKey]
if !ok {
Expand All @@ -177,6 +229,15 @@ func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string) er
if sc.AllowVolumeExpansion == nil || *sc.AllowVolumeExpansion != allowVolumeExpansion {
errs = append(errs, errors.New("wrong AllowVolumeExpansion"))
}
if s.addAllowedTopology {
if sc.AllowedTopologies == nil {
errs = append(errs, errors.New("allowedtopology flag is true but missing allowedtopologies"))
} else if sc.AllowedTopologies != nil {
if zoneID != getExistingZoneID(sc.AllowedTopologies) {
errs = append(errs, errors.New("allowedtopology flag is true but zoneID is not the same with desired zoneID: "+zoneID))
}
}
}

if len(errs) > 0 {
return combinedError(errs)
Expand Down
56 changes: 40 additions & 16 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (

// Config holds the syncer tool configuration.
type Config struct {
Agent string
CloudStackConfig string
KubeConfig string
Label string
NamePrefix string
Delete bool
Agent string
CloudStackConfig string
KubeConfig string
Label string
NodeName string
AddAllowedTopology bool
NamePrefix string
Delete bool
}

// Syncer has a function Run which synchronizes CloudStack
Expand All @@ -36,11 +38,14 @@ type Syncer interface {

// syncer is Syncer implementation.
type syncer struct {
k8sClient *kubernetes.Clientset
csClient *cloudstack.CloudStackClient
labelsSet labels.Set
namePrefix string
delete bool
k8sClient *kubernetes.Clientset
csClient *cloudstack.CloudStackClient
csConnector cloud.Interface
nodeName string
addAllowedTopology bool
labelsSet labels.Set
namePrefix string
delete bool
}

func createK8sClient(kubeconfig, agent string) (*kubernetes.Clientset, error) {
Expand Down Expand Up @@ -70,6 +75,17 @@ func createCloudStackClient(cloudstackconfig string) (*cloudstack.CloudStackClie
return client, nil
}

func createCSConnector(cloudstackconfig string) (cloud.Interface, error) {
config, err := cloud.ReadConfig(cloudstackconfig)
if err != nil {
return nil, err
}

csConnector := cloud.New(config)

return csConnector, nil
}

func createLabelsSet(label string) labels.Set {
m := make(map[string]string)
if len(label) > 0 {
Expand All @@ -95,11 +111,19 @@ func New(config Config) (Syncer, error) {
return nil, fmt.Errorf("cannot create CloudStack client: %w", err)
}

csConnector, err := createCSConnector(config.CloudStackConfig)
if err != nil {
return nil, fmt.Errorf("cannot create CS connector interface: %w", err)
}

return syncer{
k8sClient: k8sClient,
csClient: csClient,
labelsSet: createLabelsSet(config.Label),
namePrefix: config.NamePrefix,
delete: config.Delete,
k8sClient: k8sClient,
csClient: csClient,
csConnector: csConnector,
nodeName: config.NodeName,
addAllowedTopology: config.AddAllowedTopology,
labelsSet: createLabelsSet(config.Label),
namePrefix: config.NamePrefix,
delete: config.Delete,
}, nil
}