rbd: use go-ceph API for creating RBD images

This is the initial step for improving performance during provisioning
of CSI volumes backed by RBD.

While creating a volume, an existing connection to the Ceph cluster is
used from the ConnPool. This should speed up the creation of a batch of
volumes significantly.

Updates: #449
Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2020-01-07 14:45:52 +01:00 committed by mergify[bot]
parent 90f81516ee
commit 8dc3600899
2 changed files with 96 additions and 13 deletions

View File

@ -125,6 +125,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, err return nil, err
} }
// TODO: create/get a connection from the the ConnPool, and do not pass
// the credentials to any of the utility functions.
cr, err := util.NewUserCredentials(req.GetSecrets()) cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -135,6 +137,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rbdVol.Destroy()
// Existence and conflict checks // Existence and conflict checks
if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired { if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired {
@ -294,6 +297,7 @@ func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.Del
defer cs.VolumeLocks.Release(volumeID) defer cs.VolumeLocks.Release(volumeID)
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
defer rbdVol.Destroy()
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok { if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.V(3).Infof(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err) klog.V(3).Infof(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err)
@ -352,6 +356,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
defer cs.VolumeLocks.Release(volumeID) defer cs.VolumeLocks.Release(volumeID)
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
defer rbdVol.Destroy()
if err = genVolFromVolID(ctx, rbdVol, volumeID, cr, req.GetSecrets()); err != nil { if err = genVolFromVolID(ctx, rbdVol, volumeID, cr, req.GetSecrets()); err != nil {
if _, ok := err.(util.ErrPoolNotFound); ok { if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), volumeID, err) klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), volumeID, err)
@ -742,6 +747,7 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
defer rbdVol.Destroy()
err = genVolFromVolID(ctx, rbdVol, volID, cr, req.GetSecrets()) err = genVolFromVolID(ctx, rbdVol, volID, cr, req.GetSecrets())
if err != nil { if err != nil {
if _, ok := err.(ErrImageNotFound); ok { if _, ok := err.(ErrImageNotFound); ok {

View File

@ -30,6 +30,8 @@ import (
"github.com/ceph/ceph-csi/pkg/util" "github.com/ceph/ceph-csi/pkg/util"
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
"github.com/pborman/uuid" "github.com/pborman/uuid"
@ -40,7 +42,7 @@ import (
const ( const (
imageWatcherStr = "watcher=" imageWatcherStr = "watcher="
rbdImageFormat2 = "2" rbdImageFormat2 = 2
// The following three values are used for 30 seconds timeout // The following three values are used for 30 seconds timeout
// while waiting for RBD Watcher to expire. // while waiting for RBD Watcher to expire.
rbdImageWatcherInitDelay = 1 * time.Second rbdImageWatcherInitDelay = 1 * time.Second
@ -88,6 +90,9 @@ type rbdVolume struct {
DisableInUseChecks bool `json:"disableInUseChecks"` DisableInUseChecks bool `json:"disableInUseChecks"`
Encrypted bool Encrypted bool
KMS util.EncryptionKMS KMS util.EncryptionKMS
// connection
conn *rados.Conn
} }
// rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics // rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics
@ -112,36 +117,93 @@ type rbdSnapshot struct {
var ( var (
supportedFeatures = sets.NewString("layering") supportedFeatures = sets.NewString("layering")
// large interval and timeout, it should be longer than the maximum
// time an operation can take (until refcounting of the connections is
// available)
cpInterval = 15 * time.Minute
cpExpiry = 10 * time.Minute
connPool = util.NewConnPool(cpInterval, cpExpiry)
) )
// createImage creates a new ceph image with provision and volume options. // createImage creates a new ceph image with provision and volume options.
func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) error { func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) error {
var output []byte
image := pOpts.RbdImageName
volSzMiB := fmt.Sprintf("%dM", util.RoundOffVolSize(pOpts.VolSize)) volSzMiB := fmt.Sprintf("%dM", util.RoundOffVolSize(pOpts.VolSize))
options := librbd.NewRbdImageOptions()
logMsg := "rbd: create %s size %s format 2 (features: %s) using mon %s, pool %s " var err error
imageFormat := rbdImageFormat2
if pOpts.ImageFormat != "" {
imageFormat, err = strconv.Atoi(pOpts.ImageFormat)
if err != nil {
return errors.Wrapf(err, "failed to convert ImageFormat (%v) to integer", pOpts.ImageFormat)
}
}
err = options.SetUint64(librbd.RbdImageOptionFormat, uint64(imageFormat))
if err != nil {
return errors.Wrapf(err, "failed to set ImageFormat to %v", imageFormat)
}
logMsg := "rbd: create %s size %s format %d (features: %s) using mon %s, pool %s "
if pOpts.DataPool != "" { if pOpts.DataPool != "" {
logMsg += fmt.Sprintf("data pool %s", pOpts.DataPool) logMsg += fmt.Sprintf("data pool %s", pOpts.DataPool)
err = options.SetString(librbd.RbdImageOptionDataPool, pOpts.DataPool)
if err != nil {
return errors.Wrapf(err, "failed to set data pool")
}
} }
klog.V(4).Infof(util.Log(ctx, logMsg), klog.V(4).Infof(util.Log(ctx, logMsg),
image, volSzMiB, pOpts.ImageFeatures, pOpts.Monitors, pOpts.Pool) pOpts.RbdImageName, volSzMiB, imageFormat, pOpts.ImageFeatures, pOpts.Monitors, pOpts.Pool)
args := []string{"create", image, "--size", volSzMiB, "--pool", pOpts.Pool, "--id", cr.ID, "-m", pOpts.Monitors, "--keyfile=" + cr.KeyFile, "--image-feature", pOpts.ImageFeatures} if pOpts.ImageFeatures != "" {
features := imageFeaturesToUint64(ctx, pOpts.ImageFeatures)
if pOpts.DataPool != "" { err := options.SetUint64(librbd.RbdImageOptionFeatures, features)
args = append(args, "--data-pool", pOpts.DataPool) if err != nil {
return errors.Wrapf(err, "failed to set image features")
}
} }
output, err := execCommand("rbd", args)
ioctx, err := pOpts.getIoctx(cr)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to create rbd image, command output: %s", string(output)) return errors.Wrapf(err, "failed to get IOContext")
}
defer ioctx.Destroy()
err = librbd.CreateImage(ioctx, pOpts.RbdImageName,
uint64(util.RoundOffVolSize(pOpts.VolSize)*util.MiB), options)
if err != nil {
return errors.Wrapf(err, "failed to create rbd image")
} }
return nil return nil
} }
func (rv *rbdVolume) getIoctx(cr *util.Credentials) (*rados.IOContext, error) {
if rv.conn == nil {
conn, err := connPool.Get(rv.Pool, rv.Monitors, cr.KeyFile)
if err != nil {
return nil, errors.Wrapf(err, "failed to get connection")
}
rv.conn = conn
}
ioctx, err := rv.conn.OpenIOContext(rv.Pool)
if err != nil {
connPool.Put(rv.conn)
return nil, errors.Wrapf(err, "failed to open IOContext for pool %s", rv.Pool)
}
return ioctx, nil
}
func (rv *rbdVolume) Destroy() {
if rv.conn != nil {
connPool.Put(rv.conn)
}
}
// rbdStatus checks if there is watcher on the image. // rbdStatus checks if there is watcher on the image.
// It returns true if there is a watcher on the image, otherwise returns false. // It returns true if there is a watcher on the image, otherwise returns false.
func rbdStatus(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) (bool, string, error) { func rbdStatus(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) (bool, string, error) {
@ -270,7 +332,7 @@ func updateVolWithImageInfo(ctx context.Context, rbdVol *rbdVolume, cr *util.Cre
return fmt.Errorf("unknown or unsupported image format (%d) returned for image (%s)", return fmt.Errorf("unknown or unsupported image format (%d) returned for image (%s)",
imageInfo.Format, rbdVol.RbdImageName) imageInfo.Format, rbdVol.RbdImageName)
} }
rbdVol.ImageFormat = rbdImageFormat2 rbdVol.ImageFormat = strconv.Itoa(rbdImageFormat2)
rbdVol.VolSize = imageInfo.Size rbdVol.VolSize = imageInfo.Size
rbdVol.ImageFeatures = strings.Join(imageInfo.Features, ",") rbdVol.ImageFeatures = strings.Join(imageInfo.Features, ",")
@ -558,6 +620,21 @@ func hasSnapshotFeature(imageFeatures string) bool {
return false return false
} }
// imageFeaturesToUint64 takes the comma separated image features and converts
// them to a RbdImageOptionFeatures value.
func imageFeaturesToUint64(ctx context.Context, imageFeatures string) uint64 {
features := uint64(0)
for _, f := range strings.Split(imageFeatures, ",") {
if f == "layering" {
features |= librbd.RbdFeatureLayering
} else {
klog.Warningf(util.Log(ctx, "rbd: image feature %s not recognized, skipping"), f)
}
}
return features
}
func protectSnapshot(ctx context.Context, pOpts *rbdSnapshot, cr *util.Credentials) error { func protectSnapshot(ctx context.Context, pOpts *rbdSnapshot, cr *util.Credentials) error {
var output []byte var output []byte