diff --git a/deploy/kubernetes/config-map.yaml b/deploy/kubernetes/config-map.yaml index 1c6c988..5ee3ae1 100644 --- a/deploy/kubernetes/config-map.yaml +++ b/deploy/kubernetes/config-map.yaml @@ -12,12 +12,8 @@ data: # targetAddr: target service IP config.json: |- { - "nodes": [ - { - "name": "pool1", - "rpcURL": "http://34.244.195.34", - "targetType": "nvme-tcp", - "targetAddr": "127.0.0.1" - } - ] + "simplybk": { + "uuid": "185a1bba-b16f-4b99-b7e9-a983f193342c", + "ip": "34.244.195.34" + } } diff --git a/deploy/kubernetes/secret.yaml b/deploy/kubernetes/secret.yaml index b86ee51..f15d384 100644 --- a/deploy/kubernetes/secret.yaml +++ b/deploy/kubernetes/secret.yaml @@ -15,11 +15,7 @@ stringData: # } secret.json: |- { - "rpcTokens": [ - { - "name": "pool1", - "username": "spdkcsiuser", - "password": "spdkcsipass" - } - ] + "simplybk": { + "secret": "a983f193342c" + } } diff --git a/deploy/kubernetes/storageclass.yaml b/deploy/kubernetes/storageclass.yaml index 6317fcf..bf03d0a 100644 --- a/deploy/kubernetes/storageclass.yaml +++ b/deploy/kubernetes/storageclass.yaml @@ -9,5 +9,6 @@ metadata: provisioner: csi.spdk.io parameters: fsType: ext4 + pool_name: pool1 reclaimPolicy: Delete volumeBindingMode: Immediate diff --git a/pkg/spdk/controllerserver.go b/pkg/spdk/controllerserver.go index 9087567..52a6341 100644 --- a/pkg/spdk/controllerserver.go +++ b/pkg/spdk/controllerserver.go @@ -37,13 +37,13 @@ var errVolumeInCreation = status.Error(codes.Internal, "volume in creation") type controllerServer struct { *csicommon.DefaultControllerServer - spdkNodes map[string]util.SpdkNode // all spdk nodes in cluster volumeLocks *util.VolumeLocks + spdkNode *util.NodeNVMf } type spdkVolume struct { lvolID string - nodeName string + poolName string } func (cs *controllerServer) CreateVolume(_ context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { @@ -139,13 +139,13 @@ func (cs *controllerServer) CreateSnapshot(_ context.Context, req *csi.CreateSna return nil, err } - snapshotID, err := cs.spdkNodes[spdkVol.nodeName].CreateSnapshot(spdkVol.lvolID, snapshotName) + snapshotID, err := cs.spdkNode.CreateSnapshot(spdkVol.lvolID, snapshotName) if err != nil { klog.Errorf("failed to create snapshot, volumeID: %s snapshotName: %s err: %v", volumeID, snapshotName, err) return nil, status.Error(codes.Internal, err.Error()) } - volInfo, err := cs.spdkNodes[spdkVol.nodeName].VolumeInfo(spdkVol.lvolID) + volInfo, err := cs.spdkNode.VolumeInfo(spdkVol.lvolID) if err != nil { klog.Errorf("failed to get volume info, volumeID: %s err: %v", volumeID, err) return nil, status.Error(codes.Internal, err.Error()) @@ -158,7 +158,7 @@ func (cs *controllerServer) CreateSnapshot(_ context.Context, req *csi.CreateSna creationTime := timestamppb.Now() snapshotData := csi.Snapshot{ SizeBytes: size, - SnapshotId: fmt.Sprintf("%s:%s", spdkVol.nodeName, snapshotID), + SnapshotId: fmt.Sprintf("%s:%s", spdkVol.poolName, snapshotID), SourceVolumeId: spdkVol.lvolID, CreationTime: creationTime, ReadyToUse: true, @@ -180,7 +180,7 @@ func (cs *controllerServer) DeleteSnapshot(_ context.Context, req *csi.DeleteSna return nil, err } - err = cs.spdkNodes[spdkVol.nodeName].DeleteSnapshot(spdkVol.lvolID) + err = cs.spdkNode.DeleteSnapshot(spdkVol.lvolID) if err != nil { klog.Errorf("failed to delete snapshot, snapshotID: %s err: %v", snapshotID, err) return nil, status.Error(codes.Internal, err.Error()) @@ -202,37 +202,18 @@ func (cs *controllerServer) createVolume(req *csi.CreateVolumeRequest) (*csi.Vol ContentSource: req.GetVolumeContentSource(), } - // check all SPDK nodes to see if the volume has already been created - for nodeName, node := range cs.spdkNodes { - lvStores, err := node.LvStores() - if err != nil { - return nil, fmt.Errorf("get lvstores of node:%s failed: %w", nodeName, err) - } - for lvsIdx := range lvStores { - volumeID, err := node.GetVolume(req.GetName(), lvStores[lvsIdx].Name) - if err == nil { - vol.VolumeId = fmt.Sprintf("%s:%s", nodeName, volumeID) - return &vol, nil - } - } + pool_name := req.GetParameters()["pool_name"] + volumeID, err := cs.spdkNode.GetVolume(req.GetName(), pool_name) + if err == nil { + vol.VolumeId = fmt.Sprintf("%s:%s", pool_name, volumeID) + return &vol, nil } - // schedule suitable node:lvstore - nodeName, lvstore, err := cs.schedule(sizeMiB) + volumeID, err = cs.spdkNode.CreateVolume(req.GetName(), pool_name, sizeMiB) if err != nil { return nil, err } - - // TODO: re-schedule on ErrJSONNoSpaceLeft per optimistic concurrency control - volumeID, err := cs.spdkNodes[nodeName].CreateVolume(req.GetName(), lvstore, sizeMiB) - if err != nil { - return nil, err - } - // in the subsequent DeleteVolume() request, a nodeName needs to be specified, - // but the current CSI mechanism only passes the VolumeId to DeleteVolume(). - // therefore, the nodeName is included as part of the VolumeId. - vol.VolumeId = fmt.Sprintf("%s:%s", nodeName, volumeID) - + vol.VolumeId = fmt.Sprintf("%s:%s", pool_name, volumeID) return &vol, nil } @@ -245,11 +226,11 @@ func getSPDKVol(csiVolumeID string) (*spdkVolume, error) { ids := strings.Split(csiVolumeID, ":") if len(ids) == 2 { return &spdkVolume{ - nodeName: ids[0], + poolName: ids[0], lvolID: ids[1], }, nil } - return nil, fmt.Errorf("missing nodeName in volume: %s", csiVolumeID) + return nil, fmt.Errorf("missing poolName in volume: %s", csiVolumeID) } func (cs *controllerServer) publishVolume(volumeID string) (map[string]string, error) { @@ -257,12 +238,12 @@ func (cs *controllerServer) publishVolume(volumeID string) (map[string]string, e if err != nil { return nil, err } - err = cs.spdkNodes[spdkVol.nodeName].PublishVolume(spdkVol.lvolID) + err = cs.spdkNode.PublishVolume(spdkVol.lvolID) if err != nil { return nil, err } - volumeInfo, err := cs.spdkNodes[spdkVol.nodeName].VolumeInfo(spdkVol.lvolID) + volumeInfo, err := cs.spdkNode.VolumeInfo(spdkVol.lvolID) if err != nil { cs.unpublishVolume(volumeID) //nolint:errcheck // we can do little return nil, err @@ -275,7 +256,7 @@ func (cs *controllerServer) deleteVolume(volumeID string) error { if err != nil { return err } - return cs.spdkNodes[spdkVol.nodeName].DeleteVolume(spdkVol.lvolID) + return cs.spdkNode.DeleteVolume(spdkVol.lvolID) } func (cs *controllerServer) unpublishVolume(volumeID string) error { @@ -283,47 +264,23 @@ func (cs *controllerServer) unpublishVolume(volumeID string) error { if err != nil { return err } - return cs.spdkNodes[spdkVol.nodeName].UnpublishVolume(spdkVol.lvolID) -} - -// simplest volume scheduler: find first node:lvstore with enough free space -func (cs *controllerServer) schedule(sizeMiB int64) (nodeName, lvstore string, err error) { - for name, spdkNode := range cs.spdkNodes { - // retrieve latest lvstore info from spdk node - lvstores, err := spdkNode.LvStores() - if err != nil { - klog.Errorf("failed to get lvstores from node %s: %s", spdkNode.Info(), err.Error()) - continue - } - // check if lvstore has enough free space - for i := range lvstores { - lvstore := &lvstores[i] - if lvstore.FreeSizeMiB > sizeMiB { - return name, lvstore.Name, nil - } - } - klog.Infof("not enough free space from node %s", spdkNode.Info()) - } - - return "", "", fmt.Errorf("failed to find node with enough free space") + return cs.spdkNode.UnpublishVolume(spdkVol.lvolID) } func newControllerServer(d *csicommon.CSIDriver) (*controllerServer, error) { server := controllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), - spdkNodes: map[string]util.SpdkNode{}, + spdkNode: util.NodeNVMf{}, volumeLocks: util.NewVolumeLocks(), } // get spdk node configs, see deploy/kubernetes/config-map.yaml //nolint:tagliatelle // not using json:snake case var config struct { - Nodes []struct { - Name string `json:"name"` - URL string `json:"rpcURL"` - TargetType string `json:"targetType"` - TargetAddr string `json:"targetAddr"` - } `json:"Nodes"` + Simplybk struct { + Uuid string `json:"uuid"` + Ip string `json:"ip"` + } `json:"simplybk"` } configFile := util.FromEnv("SPDKCSI_CONFIG", "/etc/spdkcsi-config/config.json") err := util.ParseJSONFile(configFile, &config) @@ -331,14 +288,10 @@ func newControllerServer(d *csicommon.CSIDriver) (*controllerServer, error) { return nil, err } - // get spdk node secrets, see deploy/kubernetes/secret.yaml - //nolint:tagliatelle // not using json:snake case var secret struct { - Tokens []struct { - Name string `json:"name"` - UserName string `json:"username"` - Password string `json:"password"` - } `json:"rpcTokens"` + Simplybk struct { + Secret string `json:"secret"` + } `json:"simplybk"` } secretFile := util.FromEnv("SPDKCSI_SECRET", "/etc/spdkcsi-secret/secret.json") err = util.ParseJSONFile(secretFile, &secret) @@ -346,32 +299,62 @@ func newControllerServer(d *csicommon.CSIDriver) (*controllerServer, error) { return nil, err } - // create spdk nodes - for i := range config.Nodes { - node := &config.Nodes[i] - tokenFound := false - // find secret per node - for j := range secret.Tokens { - token := &secret.Tokens[j] - if token.Name == node.Name { - tokenFound = true - spdkNode, err := util.NewSpdkNode(node.URL, token.UserName, token.Password, node.TargetType, node.TargetAddr) - if err != nil { - klog.Errorf("failed to create spdk node %s: %s", node.Name, err.Error()) - } else { - klog.Infof("spdk node created: name=%s, url=%s", node.Name, node.URL) - server.spdkNodes[node.Name] = spdkNode - } - break - } - } - if !tokenFound { - klog.Errorf("failed to find secret for spdk node %s", node.Name) - } - } - if len(server.spdkNodes) == 0 { + spdkNode, err := util.newNVMf(config.Simplybk.Uuid, config.Simplybk.Ip, secret.Simplybk.Secret) + if err != nil { + klog.Errorf("failed to create spdk node %s: %s", node.Name, err.Error()) return nil, fmt.Errorf("no valid spdk node found") + } else { + klog.Infof("spdk node created: name=%s, url=%s", node.Name, node.URL) + server.spdkNode = spdkNode + return &server, nil } - return &server, nil + // create spdk nodes + //for i := range config.Nodes { + // node := &config.Nodes[i] + // tokenFound := false + // // find secret per node + // for j := range secret.Tokens { + // token := &secret.Tokens[j] + // if token.Name == node.Name { + // tokenFound = true + // spdkNode, err := util.NewSpdkNode(node.URL, token.UserName, token.Password, node.TargetType, node.TargetAddr) + // if err != nil { + // klog.Errorf("failed to create spdk node %s: %s", node.Name, err.Error()) + // } else { + // klog.Infof("spdk node created: name=%s, url=%s", node.Name, node.URL) + // server.spdkNodes[node.Name] = spdkNode + // } + // break + // } + // } + // if !tokenFound { + // klog.Errorf("failed to find secret for spdk node %s", node.Name) + // } + //} + //if len(server.spdkNodes) == 0 { + // return nil, fmt.Errorf("no valid spdk node found") + //} + // + //return &server, nil +} + +func (cs *DefaultControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *DefaultControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *DefaultControllerServer) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *DefaultControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *DefaultControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") } diff --git a/pkg/util/config.go b/pkg/util/config.go index c997974..34d4067 100644 --- a/pkg/util/config.go +++ b/pkg/util/config.go @@ -18,7 +18,7 @@ package util const ( // TODO: move hardcoded settings to config map - cfgRPCTimeoutSeconds = 20 + cfgRPCTimeoutSeconds = 60 cfgLvolClearMethod = "unmap" // none, unmap, write_zeroes cfgLvolThinProvision = true cfgNVMfSvcPort = "4420" diff --git a/pkg/util/jsonrpc.go b/pkg/util/jsonrpc.go index 5b5c139..7bf13f2 100644 --- a/pkg/util/jsonrpc.go +++ b/pkg/util/jsonrpc.go @@ -25,7 +25,6 @@ import ( "strconv" "strings" "sync/atomic" - "time" ) // SpdkNode defines interface for SPDK storage node @@ -124,35 +123,35 @@ var ( // jsonrpc http proxy type rpcClient struct { - rpcURL string - rpcUser string - rpcPass string - httpClient *http.Client - rpcID int32 // json request message ID, auto incremented + cluster_id string + cluster_ip string + cluster_secret string + httpClient *http.Client + rpcID int32 // json request message ID, auto incremented } -func NewSpdkNode(rpcURL, rpcUser, rpcPass, targetType, targetAddr string) (SpdkNode, error) { - client := rpcClient{ - rpcURL: rpcURL, - rpcUser: rpcUser, - rpcPass: rpcPass, - httpClient: &http.Client{Timeout: cfgRPCTimeoutSeconds * time.Second}, - } - - switch strings.ToLower(targetType) { - case "nvme-rdma": - return newNVMf(&client, "RDMA", targetAddr), nil - case "nvme-tcp": - return newNVMf(&client, "TCP", targetAddr), nil - case "iscsi": - return newISCSI(&client, targetAddr), nil - default: - return nil, fmt.Errorf("unknown transport: %s", targetType) - } -} +//func NewSpdkNode(rpcURL, rpcUser, rpcPass, targetType, targetAddr string) (SpdkNode, error) { +// client := rpcClient{ +// rpcURL: rpcURL, +// rpcUser: rpcUser, +// rpcPass: rpcPass, +// httpClient: &http.Client{Timeout: cfgRPCTimeoutSeconds * time.Second}, +// } +// +// switch strings.ToLower(targetType) { +// case "nvme-rdma": +// return newNVMf(&client, "RDMA", targetAddr), nil +// case "nvme-tcp": +// return newNVMf(&client, "TCP", targetAddr), nil +// case "iscsi": +// return newISCSI(&client, targetAddr), nil +// default: +// return nil, fmt.Errorf("unknown transport: %s", targetType) +// } +//} func (client *rpcClient) info() string { - return client.rpcURL + return client.cluster_id } func (client *rpcClient) lvStores() ([]LvStore, error) { @@ -359,12 +358,12 @@ func (client *rpcClient) call(method string, args, result interface{}) error { return fmt.Errorf("%s: %w", method, err) } - req, err := http.NewRequest(http.MethodPost, client.rpcURL, bytes.NewReader(data)) + req, err := http.NewRequest(http.MethodPost, client.cluster_ip, bytes.NewReader(data)) if err != nil { return fmt.Errorf("%s: %w", method, err) } - req.SetBasicAuth(client.rpcUser, client.rpcPass) + //req.SetBasicAuth(client.rpcUser, client.rpcPass) req.Header.Set("Content-Type", "application/json") resp, err := client.httpClient.Do(req) @@ -426,12 +425,14 @@ func (client *rpcClient) callSBCLI(method string, path string, args, result inte } } - requestURL := fmt.Sprintf("%s/%s", client.rpcURL, path) + requestURL := fmt.Sprintf("http://%s/%s", client.cluster_ip, path) req, err := http.NewRequest(method, requestURL, bytes.NewReader(data)) if err != nil { return fmt.Errorf("%s: %w", method, err) } + req.Header.Add("cluster_id", client.cluster_id) + req.Header.Add("secret", client.cluster_secret) //req.SetBasicAuth(client.rpcUser, client.rpcPass) req.Header.Set("Content-Type", "application/json") diff --git a/pkg/util/nvmf.go b/pkg/util/nvmf.go index 88245d8..083e16e 100644 --- a/pkg/util/nvmf.go +++ b/pkg/util/nvmf.go @@ -21,34 +21,42 @@ import ( "k8s.io/klog" ) -type nodeNVMf struct { +type NodeNVMf struct { client *rpcClient - targetType string // RDMA, TCP - targetAddr string - targetPort string + cluster_ip string + cluster_id string + cluster_secret string } -func newNVMf(client *rpcClient, targetType, targetAddr string) *nodeNVMf { - return &nodeNVMf{ - client: client, - targetType: targetType, - targetAddr: targetAddr, - targetPort: cfgNVMfSvcPort, +// func newNVMf(client *rpcClient, targetType, targetAddr string) *nodeNVMf { +// config.Simplybk.Uuid, config.Simplybk.Ip, secret.Simplybk.Secret +func newNVMf(cluster_id, cluster_ip, cluster_secret string) *NodeNVMf { + client := rpcClient{ + cluster_id: cluster_id, + cluster_ip: cluster_ip, + cluster_secret: cluster_secret, + httpClient: &http.Client{Timeout: cfgRPCTimeoutSeconds * time.Second}, + } + return &NodeNVMf{ + client: &client, + cluster_id: cluster_id, + cluster_ip: cluster_ip, + cluster_secret: cluster_secret, } } -func (node *nodeNVMf) Info() string { +func (node *NodeNVMf) Info() string { return node.client.info() } -func (node *nodeNVMf) LvStores() ([]LvStore, error) { +func (node *NodeNVMf) LvStores() ([]LvStore, error) { return node.client.lvStores() } // VolumeInfo returns a string:string map containing information necessary // for CSI node(initiator) to connect to this target and identify the disk. -func (node *nodeNVMf) VolumeInfo(lvolID string) (map[string]string, error) { +func (node *NodeNVMf) VolumeInfo(lvolID string) (map[string]string, error) { lvol, err := node.client.getVolumeInfo(lvolID) if err != nil { return nil, err @@ -58,7 +66,7 @@ func (node *nodeNVMf) VolumeInfo(lvolID string) (map[string]string, error) { } // CreateVolume creates a logical volume and returns volume ID -func (node *nodeNVMf) CreateVolume(lvolName, lvsName string, sizeMiB int64) (string, error) { +func (node *NodeNVMf) CreateVolume(lvolName, lvsName string, sizeMiB int64) (string, error) { // all volume have an alias ID named lvsName/lvolName lvol, err := node.client.getVolume(fmt.Sprintf("%s/%s", lvsName, lvolName)) if err == nil { @@ -75,19 +83,19 @@ func (node *nodeNVMf) CreateVolume(lvolName, lvsName string, sizeMiB int64) (str } // GetVolume returns the volume id of the given volume name and lvstore name. return error if not found. -func (node *nodeNVMf) GetVolume(lvolName, lvsName string) (string, error) { - lvol, err := node.client.getVolume(fmt.Sprintf("%s/%s", lvsName, lvolName)) +func (node *NodeNVMf) GetVolume(lvolName, pool_name string) (string, error) { + lvol, err := node.client.getVolume(fmt.Sprintf("%s/%s", pool_name, lvolName)) if err != nil { return "", err } return lvol.UUID, err } -func (node *nodeNVMf) isVolumeCreated(lvolID string) (bool, error) { +func (node *NodeNVMf) isVolumeCreated(lvolID string) (bool, error) { return node.client.isVolumeCreated(lvolID) } -func (node *nodeNVMf) CreateSnapshot(lvolID, snapshotName string) (string, error) { +func (node *NodeNVMf) CreateSnapshot(lvolID, snapshotName string) (string, error) { snapshotID, err := node.client.snapshot(lvolID, snapshotName) if err != nil { return "", err @@ -96,7 +104,7 @@ func (node *nodeNVMf) CreateSnapshot(lvolID, snapshotName string) (string, error return snapshotID, nil } -func (node *nodeNVMf) DeleteVolume(lvolID string) error { +func (node *NodeNVMf) DeleteVolume(lvolID string) error { err := node.client.deleteVolume(lvolID) if err != nil { return err @@ -105,7 +113,7 @@ func (node *nodeNVMf) DeleteVolume(lvolID string) error { return nil } -func (node *nodeNVMf) DeleteSnapshot(snapshotID string) error { +func (node *NodeNVMf) DeleteSnapshot(snapshotID string) error { err := node.client.deleteSnapshot(snapshotID) if err != nil { return err @@ -115,7 +123,7 @@ func (node *nodeNVMf) DeleteSnapshot(snapshotID string) error { } // PublishVolume exports a volume through NVMf target -func (node *nodeNVMf) PublishVolume(lvolID string) error { +func (node *NodeNVMf) PublishVolume(lvolID string) error { var isPublished bool //err := node.client.call("nvmf_subsystem_get_listeners", ¶ms, &result) err := node.client.callSBCLI("GET", "csi/publish_volume/"+lvolID, nil, &isPublished) @@ -164,7 +172,7 @@ func (node *nodeNVMf) PublishVolume(lvolID string) error { return nil } -func (node *nodeNVMf) isVolumePublished(lvolID string) (bool, error) { +func (node *NodeNVMf) isVolumePublished(lvolID string) (bool, error) { var isPublished bool //err := node.client.call("nvmf_subsystem_get_listeners", ¶ms, &result) err := node.client.callSBCLI("GET", "csi/is_volume_published/"+lvolID, nil, &isPublished) @@ -178,7 +186,7 @@ func (node *nodeNVMf) isVolumePublished(lvolID string) (bool, error) { return isPublished, nil } -func (node *nodeNVMf) UnpublishVolume(lvolID string) error { +func (node *NodeNVMf) UnpublishVolume(lvolID string) error { var isPublished bool //err := node.client.call("nvmf_subsystem_get_listeners", ¶ms, &result) err := node.client.callSBCLI("GET", "csi/unpublish_volume/"+lvolID, nil, &isPublished)