cephfs: implement getSubVolumeInfo() with go-ceph

Fixes: #1551
Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2020-10-10 18:17:08 +02:00 committed by mergify[bot]
parent 429f7acd8f
commit 6a46b8f17f
3 changed files with 39 additions and 44 deletions

View File

@ -512,8 +512,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
// as we are not able to retrieve the parent size we are rejecting the
// request to create snapshot.
// TODO: For this purpose we could make use of cached clusterAdditionalInfo too.
var info Subvolume
info, err = parentVolOptions.getSubVolumeInfo(ctx, cr, volumeID(vid.FsSubvolName))
info, err := parentVolOptions.getSubVolumeInfo(ctx, volumeID(vid.FsSubvolName))
if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster
// support it or not, its safe to evaluat as the filtering
@ -545,7 +544,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(info.BytesQuota),
SizeBytes: info.BytesQuota,
SnapshotId: sid.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: sid.CreationTime,
@ -575,7 +574,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(info.BytesQuota),
SizeBytes: info.BytesQuota,
SnapshotId: sID.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: snap.CreationTime,

View File

@ -18,6 +18,7 @@ package cephfs
import (
"context"
"fmt"
"path"
"strconv"
"strings"
@ -44,23 +45,12 @@ const (
modeAllRWX = 0777
)
// Subvolume holds subvolume information.
// Subvolume holds subvolume information. This includes only the needed members
// from fsAdmin.SubVolumeInfo.
type Subvolume struct {
BytesQuota int `json:"bytes_quota"`
DataPool string `json:"data_pool"`
Features []string `json:"features"`
GID int `json:"gid"`
Mode int `json:"mode"`
MonAddrs []string `json:"mon_addrs"`
Path string `json:"path"`
PoolNamespace string `json:"pool_namespace"`
// The subvolume "state" is based on the current state of the subvolume.
// It contains one of the following values:
// * "complete": subvolume is ready for all operations.
// * "snapshot-retained": subvolume is removed but its snapshots are retained.
State string `json:"state"`
Type string `json:"type"`
UID int `json:"uid"`
BytesQuota int64
Path string
Features []string
}
func getVolumeRootPathCephDeprecated(volID volumeID) string {
@ -94,36 +84,43 @@ func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *u
return strings.TrimSuffix(stdout, "\n"), nil
}
func (vo *volumeOptions) getSubVolumeInfo(ctx context.Context, cr *util.Credentials, volID volumeID) (Subvolume, error) {
info := Subvolume{}
err := execCommandJSON(
ctx,
&info,
"ceph",
"fs",
"subvolume",
"info",
vo.FsName,
string(volID),
"--group_name",
vo.SubvolumeGroup,
"-m", vo.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--keyfile="+cr.KeyFile)
func (vo *volumeOptions) getSubVolumeInfo(ctx context.Context, volID volumeID) (*Subvolume, error) {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
util.ErrorLog(ctx, "could not get FSAdmin, can not fetch metadata pool for %s:", vo.FsName, err)
return nil, err
}
info, err := fsa.SubVolumeInfo(vo.FsName, vo.SubvolumeGroup, string(volID))
if err != nil {
util.ErrorLog(ctx, "failed to get subvolume info for the vol %s: %s", string(volID), err)
if strings.HasPrefix(err.Error(), volumeNotFound) {
return info, ErrVolumeNotFound
return nil, ErrVolumeNotFound
}
// Incase the error is other than invalid command return error to the caller.
if !strings.Contains(err.Error(), invalidCommand) {
return info, ErrInvalidCommand
return nil, ErrInvalidCommand
}
return info, err
return nil, err
}
return info, nil
bc, ok := info.BytesQuota.(fsAdmin.ByteCount)
if !ok {
// info.BytesQuota == Infinite
return nil, fmt.Errorf("subvolume %s has unsupported quota: %v", string(volID), info.BytesQuota)
}
subvol := Subvolume{
BytesQuota: int64(bc),
Path: info.Path,
Features: make([]string, len(info.Features)),
}
for i, feature := range info.Features {
subvol.Features[i] = string(feature)
}
return &subvol, nil
}
type localClusterState struct {

View File

@ -250,7 +250,6 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
vi util.CSIIdentifier
volOptions volumeOptions
vid volumeIdentifier
info Subvolume
)
// Decode the VolID first, to detect older volumes or pre-provisioned volumes
@ -339,7 +338,7 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
volOptions.ProvisionVolume = true
info, err = volOptions.getSubVolumeInfo(ctx, cr, volumeID(vid.FsSubvolName))
info, err := volOptions.getSubVolumeInfo(ctx, volumeID(vid.FsSubvolName))
if err == nil {
volOptions.RootPath = info.Path
volOptions.Features = info.Features
@ -525,7 +524,7 @@ func newSnapshotOptionsFromID(ctx context.Context, snapID string, cr *util.Crede
sid.FsSnapshotName = imageAttributes.ImageName
sid.FsSubvolName = imageAttributes.SourceName
subvolInfo, err := volOptions.getSubVolumeInfo(ctx, cr, volumeID(sid.FsSubvolName))
subvolInfo, err := volOptions.getSubVolumeInfo(ctx, volumeID(sid.FsSubvolName))
if err != nil {
return &volOptions, nil, &sid, err
}