cephfs: add subvolume path to volume context

There are many usecases with adding the subvolume
path to the PV object. the volume context returned
in the createVolumeResponse is added to the PV object
by the external provisioner.

More Details about the usecases are in below link
https://github.com/rook/rook/issues/5471

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-02-08 12:45:23 +05:30 committed by mergify[bot]
parent 6256be0ce0
commit 8cd901d2dd
2 changed files with 36 additions and 7 deletions

View File

@ -207,6 +207,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
} }
volumeContext := req.GetParameters() volumeContext := req.GetParameters()
volumeContext["subvolumeName"] = vID.FsSubvolName volumeContext["subvolumeName"] = vID.FsSubvolName
volumeContext["subvolumePath"] = volOptions.RootPath
volume := &csi.Volume{ volume := &csi.Volume{
VolumeId: vID.VolumeID, VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size, CapacityBytes: volOptions.Size,
@ -250,10 +251,31 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
} }
return nil, err return nil, err
} }
volOptions.RootPath, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vID.FsSubvolName))
if err != nil {
purgeErr := volOptions.purgeVolume(ctx, volumeID(vID.FsSubvolName), true)
if purgeErr != nil {
util.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, purgeErr)
// All errors other than ErrVolumeNotFound should return an error back to the caller
if !errors.Is(purgeErr, ErrVolumeNotFound) {
// If the subvolume deletion is failed, we should not cleanup
// the OMAP entry it will stale subvolume in cluster.
// set err=nil so that when we get the request again we can get
// the subvolume info.
err = nil
return nil, status.Error(codes.Internal, purgeErr.Error())
}
}
util.ErrorLog(ctx, "failed to get subvolume path %s: %v", vID.FsSubvolName, err)
return nil, status.Error(codes.Internal, err.Error())
}
util.DebugLog(ctx, "cephfs: successfully created backing volume named %s for request name %s", util.DebugLog(ctx, "cephfs: successfully created backing volume named %s for request name %s",
vID.FsSubvolName, requestName) vID.FsSubvolName, requestName)
volumeContext := req.GetParameters() volumeContext := req.GetParameters()
volumeContext["subvolumeName"] = vID.FsSubvolName volumeContext["subvolumeName"] = vID.FsSubvolName
volumeContext["subvolumePath"] = volOptions.RootPath
volume := &csi.Volume{ volume := &csi.Volume{
VolumeId: vID.VolumeID, VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size, CapacityBytes: volOptions.Size,

View File

@ -129,16 +129,23 @@ func checkVolExists(ctx context.Context,
if cloneState != cephFSCloneComplete { if cloneState != cephFSCloneComplete {
return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName) return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName)
} }
} else { }
_, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vid.FsSubvolName)) volOptions.RootPath, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vid.FsSubvolName))
if err != nil { if err != nil {
if errors.Is(err, ErrVolumeNotFound) { if errors.Is(err, ErrVolumeNotFound) {
err = j.UndoReservation(ctx, volOptions.MetadataPool, // If the subvolume is not present, cleanup the stale snapshot
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) // created for clone.
return nil, err if parentVolOpt != nil && pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(ctx, volumeID(pvID.FsSubvolName), volumeID(vid.FsSubvolName), parentVolOpt)
if err != nil {
return nil, err
}
} }
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err return nil, err
} }
return nil, err
} }
// check if topology constraints match what is found // check if topology constraints match what is found