diff --git a/pkg/cephfs/volumemounter.go b/pkg/cephfs/volumemounter.go index 3119a2474..035a161a4 100644 --- a/pkg/cephfs/volumemounter.go +++ b/pkg/cephfs/volumemounter.go @@ -17,11 +17,15 @@ limitations under the License. package cephfs import ( - "bytes" "errors" "fmt" "os" "os/exec" + "regexp" + "strconv" + "sync" + + "k8s.io/klog" ) const ( @@ -31,6 +35,12 @@ const ( var ( availableMounters []string + + // maps a mountpoint to PID of its FUSE daemon + fusePidMap = make(map[string]int) + fusePidMapMtx sync.Mutex + + fusePidRx = regexp.MustCompile(`(?m)^ceph-fuse\[(.+)\]: starting fuse$`) ) // Load available ceph mounters installed on system into availableMounters @@ -116,10 +126,24 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, vo return err } - if !bytes.Contains(stderr, []byte("starting fuse")) { + // Parse the output: + // We need "starting fuse" meaning the mount is ok + // and PID of the ceph-fuse daemon for unmount + + match := fusePidRx.FindSubmatch(stderr) + if len(match) != 2 { return fmt.Errorf("ceph-fuse failed: %s", stderr) } + pid, err := strconv.Atoi(string(match[1])) + if err != nil { + return fmt.Errorf("failed to parse FUSE daemon PID: %v", err) + } + + fusePidMapMtx.Lock() + fusePidMap[mountPoint] = pid + fusePidMapMtx.Unlock() + return nil } @@ -173,7 +197,29 @@ func bindMount(from, to string, readOnly bool) error { } func unmountVolume(mountPoint string) error { - return execCommandErr("umount", mountPoint) + if err := execCommandErr("umount", mountPoint); err != nil { + return err + } + + fusePidMapMtx.Lock() + pid, ok := fusePidMap[mountPoint] + if ok { + delete(fusePidMap, mountPoint) + } + fusePidMapMtx.Unlock() + + if ok { + p, err := os.FindProcess(pid) + if err != nil { + klog.Warningf("failed to find process %d: %v", pid, err) + } else { + if _, err = p.Wait(); err != nil { + klog.Warningf("%d is not a child process: %v", pid, err) + } + } + } + + return nil } func createMountPoint(root string) error {