rbd: add support for rbd striping

RBD supports creating rbd images with
object size, stripe unit and stripe count
to support striping. This PR adds the support
for the same.

More details about striping at
https://docs.ceph.com/en/quincy/man/8/rbd/#striping

fixes: #3124

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2022-05-24 09:08:42 +05:30
committed by mergify[bot]
parent 8f99fe7250
commit 4b57cc3ec5
7 changed files with 447 additions and 36 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util"
@ -94,6 +95,43 @@ func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.Crea
return err
}
err = validateStriping(req.Parameters)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
return nil
}
func validateStriping(parameters map[string]string) error {
stripeUnit := parameters["stripeUnit"]
stripeCount := parameters["stripeCount"]
if stripeUnit != "" && stripeCount == "" {
return errors.New("stripeCount must be specified when stripeUnit is specified")
}
if stripeUnit == "" && stripeCount != "" {
return errors.New("stripeUnit must be specified when stripeCount is specified")
}
objectSize := parameters["objectSize"]
if objectSize != "" {
objSize, err := strconv.ParseUint(objectSize, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse objectSize %s: %w", objectSize, err)
}
// check objectSize is power of 2
/*
Take 2^3=8 for example.
x & (x-1)
8 & 7
1000 & 0111 = 0000
*/
if objSize == 0 || (objSize&(objSize-1)) != 0 {
return fmt.Errorf("objectSize %s is not power of 2", objectSize)
}
}
return nil
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import "testing"
func TestValidateStriping(t *testing.T) {
t.Parallel()
tests := []struct {
name string
parameters map[string]string
wantErr bool
}{
{
name: "when stripeUnit is not specified",
parameters: map[string]string{
"stripeUnit": "",
"stripeCount": "10",
"objectSize": "2",
},
wantErr: true,
},
{
name: "when stripeCount is not specified",
parameters: map[string]string{
"stripeUnit": "4096",
"stripeCount": "",
"objectSize": "2",
},
wantErr: true,
},
{
name: "when objectSize is not power of 2",
parameters: map[string]string{
"stripeUnit": "4096",
"stripeCount": "8",
"objectSize": "3",
},
wantErr: true,
},
{
name: "when objectSize is 0",
parameters: map[string]string{
"stripeUnit": "4096",
"stripeCount": "8",
"objectSize": "0",
},
wantErr: true,
},
{
name: "when valid stripe parameters are specified",
parameters: map[string]string{
"stripeUnit": "4096",
"stripeCount": "8",
"objectSize": "131072",
},
wantErr: false,
},
{
name: "when no stripe parameters are specified",
parameters: map[string]string{},
wantErr: false,
},
}
for _, tt := range tests {
ts := tt
t.Run(ts.name, func(t *testing.T) {
t.Parallel()
if err := validateStriping(ts.parameters); (err != nil) != ts.wantErr {
t.Errorf("validateStriping() error = %v, wantErr %v", err, ts.wantErr)
}
})
}
}

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
@ -99,6 +100,11 @@ type rbdImage struct {
// VolSize is the size of the RBD image backing this rbdImage.
VolSize int64
// image striping configurations.
StripeCount uint64
StripeUnit uint64
ObjectSize uint64
Monitors string
// JournalPool is the ceph pool in which the CSI Journal/CSI snapshot Journal is
// stored
@ -408,27 +414,19 @@ func (rs *rbdSnapshot) String() string {
// createImage creates a new ceph image with provision and volume options.
func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) error {
volSzMiB := fmt.Sprintf("%dM", util.RoundOffVolSize(pOpts.VolSize))
options := librbd.NewRbdImageOptions()
logMsg := "rbd: create %s size %s (features: %s) using mon %s"
if pOpts.DataPool != "" {
logMsg += fmt.Sprintf(", data pool %s", pOpts.DataPool)
err := options.SetString(librbd.RbdImageOptionDataPool, pOpts.DataPool)
if err != nil {
return fmt.Errorf("failed to set data pool: %w", err)
}
}
log.DebugLog(ctx, logMsg,
log.DebugLog(ctx, "rbd: create %s size %s (features: %s) using mon %s",
pOpts, volSzMiB, pOpts.ImageFeatureSet.Names(), pOpts.Monitors)
if pOpts.ImageFeatureSet != 0 {
err := options.SetUint64(librbd.RbdImageOptionFeatures, uint64(pOpts.ImageFeatureSet))
if err != nil {
return fmt.Errorf("failed to set image features: %w", err)
}
options := librbd.NewRbdImageOptions()
defer options.Destroy()
err := pOpts.setImageOptions(ctx, options)
if err != nil {
return err
}
err := pOpts.Connect(cr)
err = pOpts.Connect(cr)
if err != nil {
return err
}
@ -1280,9 +1278,40 @@ func genVolFromVolumeOptions(
rbdVol.Mounter)
rbdVol.DisableInUseChecks = disableInUseChecks
err = rbdVol.setStripeConfiguration(volOptions)
if err != nil {
return nil, err
}
return rbdVol, nil
}
func (ri *rbdImage) setStripeConfiguration(options map[string]string) error {
var err error
if val, ok := options["stripeUnit"]; ok {
ri.StripeUnit, err = strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse stripeUnit %s: %w", val, err)
}
}
if val, ok := options["stripeCount"]; ok {
ri.StripeCount, err = strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse stripeCount %s: %w", val, err)
}
}
if val, ok := options["objectSize"]; ok {
ri.ObjectSize, err = strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse objectSize %s: %w", val, err)
}
}
return nil
}
func (rv *rbdVolume) validateImageFeatures(imageFeatures string) error {
// It is possible for image features to be an empty string which
// the Go split function would return a single item array with
@ -1384,7 +1413,8 @@ func (rv *rbdVolume) cloneRbdImageFromSnapshot(
parentVol *rbdVolume,
) error {
var err error
logMsg := "rbd: clone %s %s (features: %s) using mon %s"
log.DebugLog(ctx, "rbd: clone %s %s (features: %s) using mon %s",
pSnapOpts, rv, rv.ImageFeatureSet.Names(), rv.Monitors)
err = parentVol.openIoctx()
if err != nil {
@ -1397,30 +1427,15 @@ func (rv *rbdVolume) cloneRbdImageFromSnapshot(
options := librbd.NewRbdImageOptions()
defer options.Destroy()
if rv.DataPool != "" {
logMsg += fmt.Sprintf(", data pool %s", rv.DataPool)
err = options.SetString(librbd.RbdImageOptionDataPool, rv.DataPool)
if err != nil {
return fmt.Errorf("failed to set data pool: %w", err)
}
}
log.DebugLog(ctx, logMsg,
pSnapOpts, rv, rv.ImageFeatureSet.Names(), rv.Monitors)
if rv.ImageFeatureSet != 0 {
err = options.SetUint64(librbd.RbdImageOptionFeatures, uint64(rv.ImageFeatureSet))
if err != nil {
return fmt.Errorf("failed to set image features: %w", err)
}
err = rv.setImageOptions(ctx, options)
if err != nil {
return err
}
err = options.SetUint64(librbd.ImageOptionCloneFormat, 2)
if err != nil {
return fmt.Errorf("failed to set image features: %w", err)
return err
}
// As the clone is yet to be created, open the Ioctx.
err = rv.openIoctx()
if err != nil {
@ -1461,6 +1476,52 @@ func (rv *rbdVolume) cloneRbdImageFromSnapshot(
return nil
}
// setImageOptions sets the image options.
func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageOptions) error {
var err error
logMsg := fmt.Sprintf("setting image options on %s", rv)
if rv.DataPool != "" {
logMsg += fmt.Sprintf(", data pool %s", rv.DataPool)
err = options.SetString(librbd.RbdImageOptionDataPool, rv.DataPool)
if err != nil {
return fmt.Errorf("failed to set data pool: %w", err)
}
}
if rv.ImageFeatureSet != 0 {
err = options.SetUint64(librbd.RbdImageOptionFeatures, uint64(rv.ImageFeatureSet))
if err != nil {
return fmt.Errorf("failed to set image features: %w", err)
}
}
if rv.StripeCount != 0 {
logMsg += fmt.Sprintf(", stripe count %d, stripe unit %d", rv.StripeCount, rv.StripeUnit)
err = options.SetUint64(librbd.RbdImageOptionStripeCount, rv.StripeCount)
if err != nil {
return fmt.Errorf("failed to set stripe count: %w", err)
}
err = options.SetUint64(librbd.RbdImageOptionStripeUnit, rv.StripeUnit)
if err != nil {
return fmt.Errorf("failed to set stripe unit: %w", err)
}
}
if rv.ObjectSize != 0 {
order := uint64(math.Log2(float64(rv.ObjectSize)))
logMsg += fmt.Sprintf(", object size %d, order %d", rv.ObjectSize, order)
err = options.SetUint64(librbd.RbdImageOptionOrder, order)
if err != nil {
return fmt.Errorf("failed to set object size: %w", err)
}
}
log.DebugLog(ctx, logMsg)
return nil
}
// getImageInfo queries rbd about the given image and returns its metadata, and returns
// ErrImageNotFound if provided image is not found.
func (ri *rbdImage) getImageInfo() error {