Skip to content

Commit

Permalink
Make Node{Stage,Publish}Volume RPCs idempotent on kubelet restarts (#752
Browse files Browse the repository at this point in the history
)

When kubelet is restarted NodeStageVolume/NodePublishVolume RPCs are
called with existing staging-target-path and target-path. Previously
respective RPCs return errors stating staging-target-path/target-path
are already mounted.

This PR fixes the issue.

Signed-off-by: Bala.FA <[email protected]>
  • Loading branch information
balamurugana authored Apr 12, 2023
1 parent 6db246b commit 378b953
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 51 deletions.
4 changes: 2 additions & 2 deletions pkg/csi/node/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func createFakeServer() *Server {
rack: "test-rack",
zone: "test-zone",
region: "test-region",
getMounts: func() (map[string]utils.StringSet, error) {
return map[string]utils.StringSet{consts.MountRootDir: nil}, nil
getMounts: func() (map[string]utils.StringSet, map[string]utils.StringSet, error) {
return map[string]utils.StringSet{consts.MountRootDir: nil}, map[string]utils.StringSet{consts.MountRootDir: nil}, nil
},
getDeviceByFSUUID: func(fsuuid string) (string, error) { return "", nil },
bindMount: func(source, target string, readOnly bool) error { return nil },
Expand Down
16 changes: 11 additions & 5 deletions pkg/csi/node/publish_unpublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ func (server *Server) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
}

mountPointMap, err := server.getMounts()
mountPointMap, _, err := server.getMounts()
if err != nil {
klog.ErrorS(err, "unable to get mounts")
return nil, status.Error(codes.Internal, err.Error())
}
if _, found := mountPointMap[req.GetStagingTargetPath()]; !found {

stagingTargetPathDevices, found := mountPointMap[req.GetStagingTargetPath()]
if !found {
klog.Errorf("stagingPath %v is not mounted", req.GetStagingTargetPath())
return nil, status.Error(codes.Internal, fmt.Sprintf("stagingPath %v is not mounted", req.GetStagingTargetPath()))
}
Expand All @@ -140,9 +142,13 @@ func (server *Server) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Errorf(codes.Internal, "unable to create target path: %v", err)
}

if err := server.bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly()); err != nil {
klog.ErrorS(err, "unable to bind mount staging target path to target path", "StagingTargetPath", req.GetStagingTargetPath(), "TargetPath", req.GetTargetPath())
return nil, status.Errorf(codes.Internal, "unable to bind mount staging target path to target path; %v", err)
if targetPathDevices, found := mountPointMap[req.GetTargetPath()]; found && targetPathDevices.Equal(stagingTargetPathDevices) {
klog.V(5).InfoS("stagingTargetPath is already bind-mounted to targetPath", "stagingTargetPath", req.GetStagingTargetPath(), "targetPath", req.GetTargetPath())
} else {
if err := server.bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly()); err != nil {
klog.ErrorS(err, "unable to bind mount staging target path to target path", "StagingTargetPath", req.GetStagingTargetPath(), "TargetPath", req.GetTargetPath())
return nil, status.Errorf(codes.Internal, "unable to bind mount staging target path to target path; %v", err)
}
}

volume.Status.TargetPath = req.GetTargetPath()
Expand Down
4 changes: 2 additions & 2 deletions pkg/csi/node/publish_unpublish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func TestPublishUnpublishVolume(t *testing.T) {
ns := createFakeServer()

// Publish volume test
ns.getMounts = func() (map[string]utils.StringSet, error) {
return map[string]utils.StringSet{testStagingPath: nil}, nil
ns.getMounts = func() (map[string]utils.StringSet, map[string]utils.StringSet, error) {
return map[string]utils.StringSet{testStagingPath: nil}, map[string]utils.StringSet{testStagingPath: nil}, nil
}
_, err := ns.NodePublishVolume(ctx, &publishVolumeRequest)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/csi/node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Server struct {
zone string
region string

getMounts func() (map[string]utils.StringSet, error)
getMounts func() (mountMap, rootMap map[string]utils.StringSet, err error)
getDeviceByFSUUID func(fsuuid string) (string, error)
bindMount func(source, target string, readOnly bool) error
unmount func(target string) error
Expand All @@ -59,8 +59,8 @@ func newServer(identity string, nodeID directpvtypes.NodeID, rack, zone, region
zone: zone,
region: region,

getMounts: func() (mountMap map[string]utils.StringSet, err error) {
mountMap, _, _, err = sys.GetMounts(false)
getMounts: func() (mountMap, rootMap map[string]utils.StringSet, err error) {
mountMap, _, _, rootMap, err = sys.GetMounts(false)
return
},
getDeviceByFSUUID: sys.GetDeviceByFSUUID,
Expand Down
5 changes: 5 additions & 0 deletions pkg/csi/node/stage_unstage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/drive"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -61,6 +62,10 @@ func (server *Server) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
server.mkdir,
server.setQuota,
server.bindMount,
func() (rootMap map[string]utils.StringSet, err error) {
_, rootMap, err = server.getMounts()
return
},
)
if err != nil {
return nil, status.Error(code, err.Error())
Expand Down
8 changes: 4 additions & 4 deletions pkg/csi/node/stage_unstage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func TestNodeStageVolume(t *testing.T) {
client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())

nodeServer := createFakeServer()
nodeServer.getMounts = func() (map[string]utils.StringSet, error) {
return testCase.mountInfo, nil
nodeServer.getMounts = func() (map[string]utils.StringSet, map[string]utils.StringSet, error) {
return testCase.mountInfo, testCase.mountInfo, nil
}
nodeServer.bindMount = func(source, stagingTargetPath string, readOnly bool) error {
if _, found := testCase.mountInfo[source]; !found {
Expand Down Expand Up @@ -147,8 +147,8 @@ func TestStageUnstageVolume(t *testing.T) {
ctx := context.TODO()
ns := createFakeServer()
dataPath := path.Join(consts.MountRootDir, testDriveName, ".FSUUID."+testDriveName, testVolumeName50MB)
ns.getMounts = func() (map[string]utils.StringSet, error) {
return map[string]utils.StringSet{consts.MountRootDir: nil}, nil
ns.getMounts = func() (map[string]utils.StringSet, map[string]utils.StringSet, error) {
return map[string]utils.StringSet{consts.MountRootDir: nil}, map[string]utils.StringSet{consts.MountRootDir: nil}, nil
}

// Stage Volume test
Expand Down
4 changes: 2 additions & 2 deletions pkg/device/probe_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func probe() (devices []Device, err error) {
return nil, err
}

_, deviceMountMap, majorMinorMap, err := sys.GetMounts(true)
_, deviceMountMap, majorMinorMap, _, err := sys.GetMounts(true)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +115,7 @@ func probe() (devices []Device, err error) {
}

func probeDevices(majorMinor ...string) (devices []Device, err error) {
_, deviceMountMap, majorMinorMap, err := sys.GetMounts(true)
_, deviceMountMap, majorMinorMap, _, err := sys.GetMounts(true)
if err != nil {
return nil, err
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/drive/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ func SetIOError(ctx context.Context, driveID directpvtypes.DriveID) error {
return retry.RetryOnConflict(retry.DefaultRetry, updateFunc)
}

func stageVolumeMount(
volumeName, volumeDir, stagingTargetPath string,
getMounts func() (map[string]utils.StringSet, error),
bindMount func(volumeDir, stagingTargetPath string, readOnly bool) error,
) error {
rootMountPointMap, err := getMounts()
if err != nil {
return err
}

if mountPoints, found := rootMountPointMap["/"+volumeName]; found && mountPoints.Exist(stagingTargetPath) {
klog.V(5).InfoS("volumeDir is already bind-mounted on stagingTargetPath", "volumeDir", volumeDir, "stagingTargetPath", stagingTargetPath)
return nil
}

return bindMount(volumeDir, stagingTargetPath, false)
}

// StageVolume creates and mounts staging target path of the volume to the drive.
func StageVolume(
ctx context.Context,
Expand All @@ -72,6 +90,7 @@ func StageVolume(
mkdir func(volumeDir string) error,
setQuota func(ctx context.Context, device, stagingTargetPath, volumeName string, quota xfs.Quota, update bool) error,
bindMount func(volumeDir, stagingTargetPath string, readOnly bool) error,
getMounts func() (map[string]utils.StringSet, error),
) (codes.Code, error) {
device, err := getDeviceByFSUUID(volume.Status.FSUUID)
if err != nil {
Expand Down Expand Up @@ -114,7 +133,7 @@ func StageVolume(
}

if stagingTargetPath != "" {
if err := bindMount(volumeDir, stagingTargetPath, false); err != nil {
if err := stageVolumeMount(volume.Name, volumeDir, stagingTargetPath, getMounts, bindMount); err != nil {
return codes.Internal, fmt.Errorf("unable to bind mount volume directory to staging target path; %w", err)
}
}
Expand All @@ -133,7 +152,7 @@ func StageVolume(

type driveEventHandler struct {
nodeID directpvtypes.NodeID
getMounts func() (mountPointMap, deviceMap map[string]utils.StringSet, err error)
getMounts func() (mountPointMap, deviceMap, rootMountPointMap map[string]utils.StringSet, err error)
unmount func(target string) error
mkdir func(path string) error
bindMount func(source, target string, readOnly bool) error
Expand All @@ -145,8 +164,8 @@ type driveEventHandler struct {
func newDriveEventHandler(nodeID directpvtypes.NodeID) *driveEventHandler {
return &driveEventHandler{
nodeID: nodeID,
getMounts: func() (mountPointMap, deviceMap map[string]utils.StringSet, err error) {
mountPointMap, deviceMap, _, err = sys.GetMounts(false)
getMounts: func() (mountPointMap, deviceMap, rootMountPointMap map[string]utils.StringSet, err error) {
mountPointMap, deviceMap, _, rootMountPointMap, err = sys.GetMounts(false)
return
},
unmount: func(mountPoint string) error {
Expand Down Expand Up @@ -189,7 +208,7 @@ func (handler *driveEventHandler) ObjectType() runtime.Object {
}

func (handler *driveEventHandler) unmountDrive(drive *types.Drive, skipDriveMount bool) error {
mountPointMap, deviceMap, err := handler.getMounts()
mountPointMap, deviceMap, _, err := handler.getMounts()
if err != nil {
return err
}
Expand Down Expand Up @@ -280,6 +299,10 @@ func (handler *driveEventHandler) move(ctx context.Context, drive *types.Drive)
handler.mkdir,
handler.setQuota,
handler.bindMount,
func() (rootMap map[string]utils.StringSet, err error) {
_, _, rootMap, err = handler.getMounts()
return
},
)
if err != nil && !errors.Is(err, os.ErrNotExist) {
klog.ErrorS(err, "unable to stage volume after volume move",
Expand Down
2 changes: 1 addition & 1 deletion pkg/initrequest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newInitRequestEventHandler(ctx context.Context, nodeID directpvtypes.NodeID
probeDevices: pkgdevice.Probe,
getDevices: pkgdevice.ProbeDevices,
getMounts: func() (deviceMap, majorMinorMap map[string]utils.StringSet, err error) {
if _, deviceMap, majorMinorMap, err = sys.GetMounts(true); err != nil {
if _, deviceMap, majorMinorMap, _, err = sys.GetMounts(true); err != nil {
err = fmt.Errorf("unable get mount points; %w", err)
}
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/sys/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (e *ErrMountPointAlreadyMounted) Error() string {
}

// GetMounts returns mount-point to devices and devices to mount-point maps.
func GetMounts(includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap map[string]utils.StringSet, err error) {
func GetMounts(includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap, rootMountPointMap map[string]utils.StringSet, err error) {
return getMounts(includeMajorMinorMap)
}

Expand Down
41 changes: 17 additions & 24 deletions pkg/sys/mount_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@ import (
"k8s.io/klog/v2"
)

func parseProc1Mountinfo(r io.Reader) (mountPointMap, deviceMap, majorMinorMap map[string]utils.StringSet, err error) {
func parseProc1Mountinfo(r io.Reader) (mountPointMap, deviceMap, majorMinorMap, rootMountPointMap map[string]utils.StringSet, err error) {
reader := bufio.NewReader(r)

mountPointMap = make(map[string]utils.StringSet)
deviceMap = make(map[string]utils.StringSet)
majorMinorMap = make(map[string]utils.StringSet)
rootMountPointMap = make(map[string]utils.StringSet)
for {
s, err := reader.ReadString('\n')
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// Refer /proc/[pid]/mountinfo section in https://man7.org/linux/man-pages/man5/proc.5.html
Expand All @@ -64,6 +65,7 @@ func parseProc1Mountinfo(r io.Reader) (mountPointMap, deviceMap, majorMinorMap m
}

majorMinor := tokens[2]
root := tokens[3]
mountPoint := tokens[4]
device := tokens[i+1]

Expand All @@ -81,6 +83,11 @@ func parseProc1Mountinfo(r io.Reader) (mountPointMap, deviceMap, majorMinorMap m
majorMinorMap[majorMinor] = make(utils.StringSet)
}
majorMinorMap[majorMinor].Set(device)

if _, found := rootMountPointMap[root]; !found {
rootMountPointMap[root] = make(utils.StringSet)
}
rootMountPointMap[root].Set(mountPoint)
}

return
Expand All @@ -94,7 +101,7 @@ func getMajorMinor(device string) (majorMinor string, err error) {
return
}

func parseProcMounts(r io.Reader, includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap map[string]utils.StringSet, err error) {
func parseProcMounts(r io.Reader, includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap, rootMountPointMap map[string]utils.StringSet, err error) {
reader := bufio.NewReader(r)

mountPointMap = make(map[string]utils.StringSet)
Expand All @@ -105,7 +112,7 @@ func parseProcMounts(r io.Reader, includeMajorMinorMap bool) (mountPointMap, dev
if errors.Is(err, io.EOF) {
break
}
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// Refer /proc/mounts section in https://man7.org/linux/man-pages/man5/proc.5.html
Expand Down Expand Up @@ -142,7 +149,7 @@ func parseProcMounts(r io.Reader, includeMajorMinorMap bool) (mountPointMap, dev

majorMinor, err := getMajorMinor(device)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

if _, found := majorMinorMap[device]; !found {
Expand All @@ -154,19 +161,19 @@ func parseProcMounts(r io.Reader, includeMajorMinorMap bool) (mountPointMap, dev
return
}

func getMounts(includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap map[string]utils.StringSet, err error) {
func getMounts(includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap, rootMountPointMap map[string]utils.StringSet, err error) {
file, err := os.Open("/proc/1/mountinfo")
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
} else {
defer file.Close()
return parseProc1Mountinfo(file)
}

if file, err = os.Open("/proc/mounts"); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

defer file.Close()
Expand Down Expand Up @@ -197,7 +204,7 @@ var mountFlagMap = map[string]uintptr{
}

func mount(device, target, fsType string, flags []string, superBlockFlags string) error {
mountPointMap, _, _, err := getMounts(false)
mountPointMap, _, _, _, err := getMounts(false)
if err != nil {
return err
}
Expand All @@ -224,20 +231,6 @@ func mount(device, target, fsType string, flags []string, superBlockFlags string
}

func bindMount(source, target, fsType string, recursive, readOnly bool, superBlockFlags string) error {
mountPointMap, _, _, err := getMounts(false)
if err != nil {
return err
}

if devices, found := mountPointMap[target]; found {
if devices.Exist(source) {
klog.V(5).InfoS("source is already mounted on target", "source", source, "target", target, "fsType", fsType, "recursive", recursive, "readOnly", readOnly, "superBlockFlags", superBlockFlags)
return nil
}

return &ErrMountPointAlreadyMounted{MountPoint: target, Devices: devices.ToSlice()}
}

flags := mountFlagMap["bind"]
if recursive {
flags |= mountFlagMap["recursive"]
Expand All @@ -250,7 +243,7 @@ func bindMount(source, target, fsType string, recursive, readOnly bool, superBlo
}

func unmount(target string, force, detach, expire bool) error {
mountPointMap, _, _, err := getMounts(false)
mountPointMap, _, _, _, err := getMounts(false)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sys/mount_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/minio/directpv/pkg/utils"
)

func getMounts(includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap map[string]utils.StringSet, err error) {
return nil, nil, nil, fmt.Errorf("unsupported operating system %v", runtime.GOOS)
func getMounts(includeMajorMinorMap bool) (mountPointMap, deviceMap, majorMinorMap, rootMountPointMap map[string]utils.StringSet, err error) {
return nil, nil, nil, nil, fmt.Errorf("unsupported operating system %v", runtime.GOOS)
}

func mount(device, target, fsType string, flags []string, superBlockFlags string) error {
Expand Down
15 changes: 15 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ func (set StringSet) ToSlice() (values []string) {
return
}

// Equal checks whether given StringSet is same or not.
func (set StringSet) Equal(set2 StringSet) (found bool) {
if len(set) != len(set2) {
return false
}

for value := range set {
if _, found := set2[value]; !found {
return false
}
}

return true
}

// Eprintf prints the message to the stdout and stderr based on inputs
func Eprintf(quiet, asErr bool, format string, a ...any) {
if quiet {
Expand Down

0 comments on commit 378b953

Please sign in to comment.