rbd: set scheduling interval on snapshot mirrored image

Mirror-snapshots can also be automatically created on a
periodic basis if mirror-snapshot schedules are defined.
The mirror-snapshot can be scheduled globally, per-pool,
or per-image levels. Multiple mirror-snapshot schedules
can be defined at any level.

To create a mirror-snapshot schedule with rbd, specify
the mirror snapshot schedule add command along with an
optional pool or image name; interval; and optional start time:

The interval can be specified in days, hours, or minutes
using d, h, m suffix respectively. The optional start-time
can be specified using the ISO 8601 time format. For example:

```
$ rbd --cluster site-a mirror snapshot schedule
  add --pool image-pool --image image1 24h 14:00:00-05:00
```

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-06-28 17:08:42 +05:30 committed by mergify[bot]
parent b1710f4c53
commit 0837c05be0
3 changed files with 277 additions and 0 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
@ -1733,3 +1734,41 @@ func (rs *rbdSnapshot) isCompatibleThickProvision(dst *rbdVolume) error {
return nil
}
func (ri *rbdImage) addSnapshotScheduling(
interval admin.Interval,
startTime admin.StartTime) error {
ls := admin.NewLevelSpec(ri.Pool, ri.RadosNamespace, ri.RbdImageName)
ra, err := ri.conn.GetRBDAdmin()
if err != nil {
return err
}
adminConn := ra.MirrorSnashotSchedule()
// list all the snapshot scheduling and check at least one image scheduling
// exists with specified interval.
ssList, err := adminConn.List(ls)
if err != nil {
return err
}
for _, ss := range ssList {
// make sure we are matching image level scheduling. The
// `adminConn.List` lists the global level scheduling also.
if ss.Name == ri.String() {
for _, s := range ss.Schedule {
// TODO: Add support to check start time also.
// The start time is currently stored with different format
// in ceph. Comparison is not possible unless we know in
// which format ceph is storing it.
if s.Interval == interval {
return err
}
}
}
}
err = adminConn.Add(ls, interval, startTime)
if err != nil {
return err
}
return nil
}

View File

@ -19,12 +19,14 @@ package rbd
import (
"context"
"errors"
"regexp"
"strconv"
"strings"
"github.com/ceph/ceph-csi/internal/util"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/csi-addons/spec/lib/go/replication"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -56,6 +58,18 @@ const (
imageMirroringKey = "mirroringMode"
// forceKey + key to get the force option from parameters.
forceKey = "force"
// schedulingIntervalKey to get the schedulingInterval from the
// parameters.
// Interval of time between scheduled snapshots. Typically in the form
// <num><m,h,d>.
schedulingIntervalKey = "schedulingInterval"
// schedulingStartTimeKey to get the schedulingStartTime from the
// parameters.
// (optional) StartTime is the time the snapshot schedule
// begins, can be specified using the ISO 8601 time format.
schedulingStartTimeKey = "schedulingStartTime"
)
// ReplicationServer struct of rbd CSI driver with supported methods of Replication
@ -108,6 +122,56 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd
return mirroringMode, nil
}
// getSchedulingDetails gets the mirroring mode and scheduling details from the
// input GRPC request parameters and validates the scheduling is only supported
// for mirroring mode.
func getSchedulingDetails(parameters map[string]string) (admin.Interval, admin.StartTime, error) {
admInt := admin.NoInterval
adminStartTime := admin.NoStartTime
var err error
val := parameters[imageMirroringKey]
switch imageMirroringMode(val) {
case imageMirrorModeSnapshot:
default:
return admInt, adminStartTime, status.Error(codes.InvalidArgument, "scheduling is only supported for snapshot mode")
}
// validate mandatory interval field
interval, ok := parameters[schedulingIntervalKey]
if ok && interval == "" {
return admInt, adminStartTime, status.Error(codes.InvalidArgument, "scheduling interval cannot be empty")
}
adminStartTime = admin.StartTime(parameters[schedulingStartTimeKey])
if !ok {
// startTime is alone not supported it has to be present with interval
if adminStartTime != "" {
return admInt, admin.NoStartTime, status.Errorf(codes.InvalidArgument,
"%q parameter is supported only with %q",
schedulingStartTimeKey,
schedulingIntervalKey)
}
}
if interval != "" {
admInt, err = validateSchedulingInterval(interval)
if err != nil {
return admInt, admin.NoStartTime, status.Error(codes.InvalidArgument, err.Error())
}
}
return admInt, adminStartTime, nil
}
// validateSchedulingInterval return the interval as it is if its ending with
// `m|h|d` or else it will return error.
func validateSchedulingInterval(interval string) (admin.Interval, error) {
var re = regexp.MustCompile(`^[0-9]+[mhd]$`)
if re.MatchString(interval) {
return admin.Interval(interval), nil
}
return "", errors.New("interval specified without d, h, m suffix")
}
// EnableVolumeReplication extracts the RBD volume information from the
// volumeID, If the image is present it will enable the mirroring based on the
// user provided information.
@ -124,6 +188,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
}
defer cr.DeleteCredentials()
interval, startTime, err := getSchedulingDetails(req.GetParameters())
if err != nil {
return nil, err
}
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
@ -162,6 +231,20 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}
}
if interval != "" {
err = rbdVol.addSnapshotScheduling(interval, startTime)
if err != nil {
return nil, err
}
util.DebugLog(
ctx,
"Added scheduling at interval %s, start time %s for volume %s",
interval,
startTime,
rbdVol)
}
return &replication.EnableVolumeReplicationResponse{}, nil
}

View File

@ -0,0 +1,155 @@
/*
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"reflect"
"testing"
"github.com/ceph/go-ceph/rbd/admin"
)
func TestValidateSchedulingInterval(t *testing.T) {
t.Parallel()
tests := []struct {
name string
interval string
want admin.Interval
wantErr bool
}{
{
"valid interval in minutes",
"3m",
admin.Interval("3m"),
false,
},
{
"valid interval in hour",
"22h",
admin.Interval("22h"),
false,
},
{
"valid interval in days",
"13d",
admin.Interval("13d"),
false,
},
{
"invalid interval without number",
"d",
admin.Interval(""),
true,
},
{
"invalid interval without (m|h|d) suffix",
"12",
admin.Interval(""),
true,
},
}
for _, tt := range tests {
st := tt
t.Run(tt.name, func(t *testing.T) {
got, err := validateSchedulingInterval(st.interval)
if (err != nil) != st.wantErr {
t.Errorf("validateSchedulingInterval() error = %v, wantErr %v", err, st.wantErr)
return
}
if !reflect.DeepEqual(got, st.want) {
t.Errorf("validateSchedulingInterval() = %v, want %v", got, st.want)
}
})
}
}
func TestGetSchedulingDetails(t *testing.T) {
tests := []struct {
name string
parameters map[string]string
wantInterval admin.Interval
wantStartTime admin.StartTime
wantErr bool
}{
{
"valid parameters",
map[string]string{
imageMirroringKey: string(imageMirrorModeSnapshot),
schedulingIntervalKey: "1h",
schedulingStartTimeKey: "14:00:00-05:00",
},
admin.Interval("1h"),
admin.StartTime("14:00:00-05:00"),
false,
},
{
"valid parameters when optional startTime is missing",
map[string]string{
imageMirroringKey: string(imageMirrorModeSnapshot),
schedulingIntervalKey: "1h",
},
admin.Interval("1h"),
admin.NoStartTime,
false,
},
{
"when mirroring mode is journal",
map[string]string{
imageMirroringKey: "journal",
schedulingIntervalKey: "1h",
},
admin.NoInterval,
admin.NoStartTime,
true,
},
{
"when startTime is specified without interval",
map[string]string{
imageMirroringKey: string(imageMirrorModeSnapshot),
schedulingStartTimeKey: "14:00:00-05:00",
},
admin.NoInterval,
admin.NoStartTime,
true,
},
{
"when no scheduling is specified",
map[string]string{
imageMirroringKey: string(imageMirrorModeSnapshot),
},
admin.NoInterval,
admin.NoStartTime,
false,
},
}
for _, tt := range tests {
st := tt
t.Run(tt.name, func(t *testing.T) {
interval, startTime, err := getSchedulingDetails(st.parameters)
if (err != nil) != st.wantErr {
t.Errorf("getSchedulingDetails() error = %v, wantErr %v", err, st.wantErr)
return
}
if !reflect.DeepEqual(interval, st.wantInterval) {
t.Errorf("getSchedulingDetails() interval = %v, want %v", interval, st.wantInterval)
}
if !reflect.DeepEqual(startTime, st.wantStartTime) {
t.Errorf("getSchedulingDetails() startTime = %v, want %v", startTime, st.wantStartTime)
}
})
}
}