Skip to content

Commit

Permalink
Add simplybk cluster support #1
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamdy-khader committed Aug 30, 2023
1 parent ed43ede commit a3350cd
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 167 deletions.
12 changes: 4 additions & 8 deletions deploy/kubernetes/config-map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@ data:
# targetAddr: target service IP
config.json: |-
{
"nodes": [
{
"name": "pool1",
"rpcURL": "http://34.244.195.34",
"targetType": "nvme-tcp",
"targetAddr": "127.0.0.1"
}
]
"simplybk": {
"uuid": "185a1bba-b16f-4b99-b7e9-a983f193342c",
"ip": "34.244.195.34"
}
}
10 changes: 3 additions & 7 deletions deploy/kubernetes/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ stringData:
# }
secret.json: |-
{
"rpcTokens": [
{
"name": "pool1",
"username": "spdkcsiuser",
"password": "spdkcsipass"
}
]
"simplybk": {
"secret": "a983f193342c"
}
}
1 change: 1 addition & 0 deletions deploy/kubernetes/storageclass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ metadata:
provisioner: csi.spdk.io
parameters:
fsType: ext4
pool_name: pool1
reclaimPolicy: Delete
volumeBindingMode: Immediate
181 changes: 82 additions & 99 deletions pkg/spdk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ var errVolumeInCreation = status.Error(codes.Internal, "volume in creation")

type controllerServer struct {
*csicommon.DefaultControllerServer
spdkNodes map[string]util.SpdkNode // all spdk nodes in cluster
volumeLocks *util.VolumeLocks
spdkNode *util.NodeNVMf
}

type spdkVolume struct {
lvolID string
nodeName string
poolName string
}

func (cs *controllerServer) CreateVolume(_ context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
Expand Down Expand Up @@ -139,13 +139,13 @@ func (cs *controllerServer) CreateSnapshot(_ context.Context, req *csi.CreateSna
return nil, err
}

snapshotID, err := cs.spdkNodes[spdkVol.nodeName].CreateSnapshot(spdkVol.lvolID, snapshotName)
snapshotID, err := cs.spdkNode.CreateSnapshot(spdkVol.lvolID, snapshotName)
if err != nil {
klog.Errorf("failed to create snapshot, volumeID: %s snapshotName: %s err: %v", volumeID, snapshotName, err)
return nil, status.Error(codes.Internal, err.Error())
}

volInfo, err := cs.spdkNodes[spdkVol.nodeName].VolumeInfo(spdkVol.lvolID)
volInfo, err := cs.spdkNode.VolumeInfo(spdkVol.lvolID)
if err != nil {
klog.Errorf("failed to get volume info, volumeID: %s err: %v", volumeID, err)
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -158,7 +158,7 @@ func (cs *controllerServer) CreateSnapshot(_ context.Context, req *csi.CreateSna
creationTime := timestamppb.Now()
snapshotData := csi.Snapshot{
SizeBytes: size,
SnapshotId: fmt.Sprintf("%s:%s", spdkVol.nodeName, snapshotID),
SnapshotId: fmt.Sprintf("%s:%s", spdkVol.poolName, snapshotID),
SourceVolumeId: spdkVol.lvolID,
CreationTime: creationTime,
ReadyToUse: true,
Expand All @@ -180,7 +180,7 @@ func (cs *controllerServer) DeleteSnapshot(_ context.Context, req *csi.DeleteSna
return nil, err
}

err = cs.spdkNodes[spdkVol.nodeName].DeleteSnapshot(spdkVol.lvolID)
err = cs.spdkNode.DeleteSnapshot(spdkVol.lvolID)
if err != nil {
klog.Errorf("failed to delete snapshot, snapshotID: %s err: %v", snapshotID, err)
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -202,37 +202,18 @@ func (cs *controllerServer) createVolume(req *csi.CreateVolumeRequest) (*csi.Vol
ContentSource: req.GetVolumeContentSource(),
}

// check all SPDK nodes to see if the volume has already been created
for nodeName, node := range cs.spdkNodes {
lvStores, err := node.LvStores()
if err != nil {
return nil, fmt.Errorf("get lvstores of node:%s failed: %w", nodeName, err)
}
for lvsIdx := range lvStores {
volumeID, err := node.GetVolume(req.GetName(), lvStores[lvsIdx].Name)
if err == nil {
vol.VolumeId = fmt.Sprintf("%s:%s", nodeName, volumeID)
return &vol, nil
}
}
pool_name := req.GetParameters()["pool_name"]
volumeID, err := cs.spdkNode.GetVolume(req.GetName(), pool_name)
if err == nil {
vol.VolumeId = fmt.Sprintf("%s:%s", pool_name, volumeID)
return &vol, nil
}

// schedule suitable node:lvstore
nodeName, lvstore, err := cs.schedule(sizeMiB)
volumeID, err = cs.spdkNode.CreateVolume(req.GetName(), pool_name, sizeMiB)
if err != nil {
return nil, err
}

// TODO: re-schedule on ErrJSONNoSpaceLeft per optimistic concurrency control
volumeID, err := cs.spdkNodes[nodeName].CreateVolume(req.GetName(), lvstore, sizeMiB)
if err != nil {
return nil, err
}
// in the subsequent DeleteVolume() request, a nodeName needs to be specified,
// but the current CSI mechanism only passes the VolumeId to DeleteVolume().
// therefore, the nodeName is included as part of the VolumeId.
vol.VolumeId = fmt.Sprintf("%s:%s", nodeName, volumeID)

vol.VolumeId = fmt.Sprintf("%s:%s", pool_name, volumeID)
return &vol, nil
}

Expand All @@ -245,24 +226,24 @@ func getSPDKVol(csiVolumeID string) (*spdkVolume, error) {
ids := strings.Split(csiVolumeID, ":")
if len(ids) == 2 {
return &spdkVolume{
nodeName: ids[0],
poolName: ids[0],
lvolID: ids[1],
}, nil
}
return nil, fmt.Errorf("missing nodeName in volume: %s", csiVolumeID)
return nil, fmt.Errorf("missing poolName in volume: %s", csiVolumeID)
}

func (cs *controllerServer) publishVolume(volumeID string) (map[string]string, error) {
spdkVol, err := getSPDKVol(volumeID)
if err != nil {
return nil, err
}
err = cs.spdkNodes[spdkVol.nodeName].PublishVolume(spdkVol.lvolID)
err = cs.spdkNode.PublishVolume(spdkVol.lvolID)
if err != nil {
return nil, err
}

volumeInfo, err := cs.spdkNodes[spdkVol.nodeName].VolumeInfo(spdkVol.lvolID)
volumeInfo, err := cs.spdkNode.VolumeInfo(spdkVol.lvolID)
if err != nil {
cs.unpublishVolume(volumeID) //nolint:errcheck // we can do little
return nil, err
Expand All @@ -275,103 +256,105 @@ func (cs *controllerServer) deleteVolume(volumeID string) error {
if err != nil {
return err
}
return cs.spdkNodes[spdkVol.nodeName].DeleteVolume(spdkVol.lvolID)
return cs.spdkNode.DeleteVolume(spdkVol.lvolID)
}

func (cs *controllerServer) unpublishVolume(volumeID string) error {
spdkVol, err := getSPDKVol(volumeID)
if err != nil {
return err
}
return cs.spdkNodes[spdkVol.nodeName].UnpublishVolume(spdkVol.lvolID)
}

// simplest volume scheduler: find first node:lvstore with enough free space
func (cs *controllerServer) schedule(sizeMiB int64) (nodeName, lvstore string, err error) {
for name, spdkNode := range cs.spdkNodes {
// retrieve latest lvstore info from spdk node
lvstores, err := spdkNode.LvStores()
if err != nil {
klog.Errorf("failed to get lvstores from node %s: %s", spdkNode.Info(), err.Error())
continue
}
// check if lvstore has enough free space
for i := range lvstores {
lvstore := &lvstores[i]
if lvstore.FreeSizeMiB > sizeMiB {
return name, lvstore.Name, nil
}
}
klog.Infof("not enough free space from node %s", spdkNode.Info())
}

return "", "", fmt.Errorf("failed to find node with enough free space")
return cs.spdkNode.UnpublishVolume(spdkVol.lvolID)
}

func newControllerServer(d *csicommon.CSIDriver) (*controllerServer, error) {
server := controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
spdkNodes: map[string]util.SpdkNode{},
spdkNode: util.NodeNVMf{},
volumeLocks: util.NewVolumeLocks(),
}

// get spdk node configs, see deploy/kubernetes/config-map.yaml
//nolint:tagliatelle // not using json:snake case
var config struct {
Nodes []struct {
Name string `json:"name"`
URL string `json:"rpcURL"`
TargetType string `json:"targetType"`
TargetAddr string `json:"targetAddr"`
} `json:"Nodes"`
Simplybk struct {
Uuid string `json:"uuid"`
Ip string `json:"ip"`
} `json:"simplybk"`
}
configFile := util.FromEnv("SPDKCSI_CONFIG", "/etc/spdkcsi-config/config.json")
err := util.ParseJSONFile(configFile, &config)
if err != nil {
return nil, err
}

// get spdk node secrets, see deploy/kubernetes/secret.yaml
//nolint:tagliatelle // not using json:snake case
var secret struct {
Tokens []struct {
Name string `json:"name"`
UserName string `json:"username"`
Password string `json:"password"`
} `json:"rpcTokens"`
Simplybk struct {
Secret string `json:"secret"`
} `json:"simplybk"`
}
secretFile := util.FromEnv("SPDKCSI_SECRET", "/etc/spdkcsi-secret/secret.json")
err = util.ParseJSONFile(secretFile, &secret)
if err != nil {
return nil, err
}

// create spdk nodes
for i := range config.Nodes {
node := &config.Nodes[i]
tokenFound := false
// find secret per node
for j := range secret.Tokens {
token := &secret.Tokens[j]
if token.Name == node.Name {
tokenFound = true
spdkNode, err := util.NewSpdkNode(node.URL, token.UserName, token.Password, node.TargetType, node.TargetAddr)
if err != nil {
klog.Errorf("failed to create spdk node %s: %s", node.Name, err.Error())
} else {
klog.Infof("spdk node created: name=%s, url=%s", node.Name, node.URL)
server.spdkNodes[node.Name] = spdkNode
}
break
}
}
if !tokenFound {
klog.Errorf("failed to find secret for spdk node %s", node.Name)
}
}
if len(server.spdkNodes) == 0 {
spdkNode, err := util.newNVMf(config.Simplybk.Uuid, config.Simplybk.Ip, secret.Simplybk.Secret)
if err != nil {
klog.Errorf("failed to create spdk node %s: %s", node.Name, err.Error())
return nil, fmt.Errorf("no valid spdk node found")
} else {
klog.Infof("spdk node created: name=%s, url=%s", node.Name, node.URL)
server.spdkNode = spdkNode
return &server, nil
}

return &server, nil
// create spdk nodes
//for i := range config.Nodes {
// node := &config.Nodes[i]
// tokenFound := false
// // find secret per node
// for j := range secret.Tokens {
// token := &secret.Tokens[j]
// if token.Name == node.Name {
// tokenFound = true
// spdkNode, err := util.NewSpdkNode(node.URL, token.UserName, token.Password, node.TargetType, node.TargetAddr)
// if err != nil {
// klog.Errorf("failed to create spdk node %s: %s", node.Name, err.Error())
// } else {
// klog.Infof("spdk node created: name=%s, url=%s", node.Name, node.URL)
// server.spdkNodes[node.Name] = spdkNode
// }
// break
// }
// }
// if !tokenFound {
// klog.Errorf("failed to find secret for spdk node %s", node.Name)
// }
//}
//if len(server.spdkNodes) == 0 {
// return nil, fmt.Errorf("no valid spdk node found")
//}
//
//return &server, nil
}

func (cs *DefaultControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *DefaultControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *DefaultControllerServer) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *DefaultControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *DefaultControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
2 changes: 1 addition & 1 deletion pkg/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package util

const (
// TODO: move hardcoded settings to config map
cfgRPCTimeoutSeconds = 20
cfgRPCTimeoutSeconds = 60
cfgLvolClearMethod = "unmap" // none, unmap, write_zeroes
cfgLvolThinProvision = true
cfgNVMfSvcPort = "4420"
Expand Down
Loading

0 comments on commit a3350cd

Please sign in to comment.