Skip to content

Commit

Permalink
cephfs: use a new data subdirectory in NodePublishVolume
Browse files Browse the repository at this point in the history
The new "data" subdirectory in the volume is bind-mounted inside the
container. The root of the volume is not available for applications, so
that Ceph-CSI can create a health-check file there and it is hidden from
the users.

Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Sep 14, 2023
1 parent 0653232 commit 98835b2
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 14 deletions.
18 changes: 18 additions & 0 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ import (
"google.golang.org/grpc/status"
)

// defaultDataRoot is a directory on the volume that will be mounted inside the
// container during NodePublishVolume. The parent directory od defaultDataRoot
// will be mounted during NodeStageVolume, and is available for Ceph-CSI
// internal consumption,
const defaultDataRoot = "data"

// ControllerServer struct of CEPH CSI driver with supported methods of CSI
// controller server spec.
type ControllerServer struct {
Expand Down Expand Up @@ -365,6 +371,12 @@ func (cs *ControllerServer) CreateVolume(
volumeContext := k8s.RemoveCSIPrefixedParameters(req.GetParameters())
volumeContext["subvolumeName"] = vID.FsSubvolName
volumeContext["subvolumePath"] = volOptions.RootPath

if volOptions.DataRoot == "" {
volOptions.DataRoot = defaultDataRoot
}
volumeContext["dataRoot"] = volOptions.DataRoot

volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
Expand Down Expand Up @@ -456,6 +468,12 @@ func (cs *ControllerServer) CreateVolume(
volumeContext := k8s.RemoveCSIPrefixedParameters(req.GetParameters())
volumeContext["subvolumeName"] = vID.FsSubvolName
volumeContext["subvolumePath"] = volOptions.RootPath

if volOptions.DataRoot == "" {
volOptions.DataRoot = defaultDataRoot
}
volumeContext["dataRoot"] = volOptions.DataRoot

volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
Expand Down
32 changes: 26 additions & 6 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

healthCheckPath := getHealthCheckPath(stagingTargetPath, req.GetVolumeContext())

// Check if the volume is already mounted

if err = ns.tryRestoreFuseMountInNodeStage(ctx, mnt, stagingTargetPath); err != nil {
Expand All @@ -230,7 +232,7 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

ns.healthChecker.StartChecker(stagingTargetPath)
ns.healthChecker.StartChecker(healthCheckPath)

return &csi.NodeStageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -274,7 +276,7 @@ func (ns *NodeServer) NodeStageVolume(
}
}

ns.healthChecker.StartChecker(stagingTargetPath)
ns.healthChecker.StartChecker(healthCheckPath)

return &csi.NodeStageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -458,6 +460,15 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

// dataPath is the directory that will be bind-mounted into the
// container. If "dataRoot" is empty, the dataPath is the same as the
// stagingTargetPath.
dataPath := stagingTargetPath
dataRoot, ok := req.GetVolumeContext()["dataRoot"]
if ok {
dataPath = path.Join(dataPath, dataRoot)
}

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish

Expand All @@ -470,7 +481,7 @@ func (ns *NodeServer) NodePublishVolume(
if err := ns.tryRestoreFuseMountsInNodePublish(
ctx,
volID,
stagingTargetPath,
dataPath,
targetPath,
req.GetVolumeContext(),
); err != nil {
Expand Down Expand Up @@ -516,15 +527,15 @@ func (ns *NodeServer) NodePublishVolume(
return nil, status.Error(codes.Internal, err.Error())
}
if encrypted {
stagingTargetPath = fscrypt.AppendEncyptedSubdirectory(stagingTargetPath)
if err = fscrypt.IsDirectoryUnlocked(stagingTargetPath, "ceph"); err != nil {
dataPath = fscrypt.AppendEncyptedSubdirectory(dataPath)
if err = fscrypt.IsDirectoryUnlocked(dataPath, "ceph"); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

if err = mounter.BindMount(
ctx,
stagingTargetPath,
dataPath,
targetPath,
req.GetReadonly(),
mountOptions); err != nil {
Expand Down Expand Up @@ -745,3 +756,12 @@ func (ns *NodeServer) NodeGetVolumeStats(

return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath)
}

func getHealthCheckPath(basedir string, volumeContext map[string]string) string{
_, ok := volumeContext["dataRoot"]
if !ok {
return path.Join(basedir, ".meta.csi")
}

return basedir
}
10 changes: 10 additions & 0 deletions internal/cephfs/store/volumeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type VolumeOptions struct {

ProvisionVolume bool `json:"provisionVolume"`
BackingSnapshot bool `json:"backingSnapshot"`

// DataRoot is set to the directory that is bind-mounted into the
// container. The parent directory of the DataRoot is not available for
// the end-user, but Ceph-CSI can use it for storing state, doing
// health-checks and the like.
DataRoot string `json:dataRoot`
}

// Connect a CephFS volume to the Ceph cluster.
Expand Down Expand Up @@ -266,6 +272,10 @@ func NewVolumeOptions(
return nil, err
}

if err = extractOptionalOption(&opts.DataRoot, "dataRoot", volOptions); err != nil {
return nil, err
}

if err = opts.InitKMS(ctx, volOptions, req.GetSecrets()); err != nil {
return nil, fmt.Errorf("failed to init KMS: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions internal/health-checker/filechecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,22 @@ type fileChecker struct {
commands chan command
}

func newFileChecker(dir string) ConditionChecker {
func newFileChecker(dir string) (ConditionChecker, error) {
_, err := os.Stat(dir)
if os.IsNotExist(err) {
err = os.MkdirAll(dir, 0755)
}
if err != nil {
return nil, err
}

return &fileChecker{
filename: path.Join(dir, "csi-volume-condition.ts"),
healthy: true,
interval: 120 * time.Second,
lastUpdate: time.Now(),
commands: make(chan command),
}
}, nil
}

// runChecker is an endless loop that writes a timestamp and reads it back from
Expand Down
19 changes: 14 additions & 5 deletions internal/health-checker/filechecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ limitations under the License.
package healthchecker

import (
"path"
"testing"
"time"
)

func TestFileChecker(t *testing.T) {
t.Parallel()

volumePath := t.TempDir()
fc := newFileChecker(volumePath)
volumePath := path.Join(t.TempDir(), "csi-health-check.d")
fc, err := newFileChecker(volumePath)
if err != nil {
t.Fatalf("failed to create FileChecker: %v", err)
}

checker := fc.(*fileChecker)
checker.interval = time.Second * 5

Expand Down Expand Up @@ -55,12 +60,16 @@ func TestFileChecker(t *testing.T) {
func TestWriteReadTimestamp(t *testing.T) {
t.Parallel()

volumePath := t.TempDir()
fc := newFileChecker(volumePath)
volumePath := path.Join(t.TempDir(), "csi-health-check.d")
fc, err := newFileChecker(volumePath)
if err != nil {
t.Fatalf("failed to create FileChecker: %v", err)
}

checker := fc.(*fileChecker)
ts := time.Now()

err := checker.writeTimestamp(ts)
err = checker.writeTimestamp(ts)
if err != nil {
t.Fatalf("failed to write timestamp: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion internal/health-checker/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func NewHealthCheckManager() Manager {
}

func (hcm *healthCheckManager) StartChecker(path string) error {
cc := newFileChecker(path)
cc, err := newFileChecker(path)
if err != nil {
return err
}

// load the 'old' ConditionChecker if it exists, otherwuse store 'cc'
old, ok := hcm.checkers.LoadOrStore(path, cc)
Expand Down

0 comments on commit 98835b2

Please sign in to comment.