From 6e941539b5c918dde0fe22d0893c3b56be59809d Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Thu, 11 Feb 2021 15:31:38 +0530 Subject: [PATCH] rbd: implement volume replication spec implemented the volume replication spec for the rbd mirroring. Signed-off-by: Madhu Rajanna --- internal/rbd/controllerserver.go | 6 + internal/rbd/replicationcontrollerserver.go | 357 ++++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 internal/rbd/replicationcontrollerserver.go diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 3a41e9792..a13365777 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -28,6 +28,7 @@ import ( librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kube-storage/spec/lib/go/replication" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -41,6 +42,11 @@ const ( // controller server spec. type ControllerServer struct { *csicommon.DefaultControllerServer + // added UnimplementedControllerServer as a member of + // ControllerServer. if replication spec add more RPC services in the proto + // file, then we don't need to add all RPC methods leading to forward + // compatibility. + *replication.UnimplementedControllerServer // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID/volume name) return an Aborted error VolumeLocks *util.VolumeLocks diff --git a/internal/rbd/replicationcontrollerserver.go b/internal/rbd/replicationcontrollerserver.go new file mode 100644 index 000000000..9645aaef8 --- /dev/null +++ b/internal/rbd/replicationcontrollerserver.go @@ -0,0 +1,357 @@ +/* +Copyright 2021 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "context" + "errors" + "strconv" + + "github.com/ceph/ceph-csi/internal/util" + + librbd "github.com/ceph/go-ceph/rbd" + "github.com/kube-storage/spec/lib/go/replication" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// imageMirroringMode is used to indicate the mirroring mode for an RBD image. +type imageMirroringMode string + +const ( + // imageMirrorModeSnapshot uses snapshots to propagate RBD images between + // ceph clusters. + imageMirrorModeSnapshot imageMirroringMode = "snapshot" +) + +// imageMirroringState represents the image mirroring state. +type imageMirroringState string + +const ( + // If the state is up+replaying, then mirroring is functioning properly. + // up means the rbd-mirror daemon is running, and replaying means + // this image is the target for replication from another storage cluster. + upAndReplaying imageMirroringState = "up+replaying" + // If the state is up+stopped means the rbd-mirror daemon is + // running and stopped means the image is not a target for replication from + // another cluster + upAndStopped imageMirroringState = "up+stopped" +) + +const ( + // mirroringMode + key to get the imageMirroringMode from parameters. + imageMirroringKey = "mirroringMode" + // forceKey + key to get the force option from parameters. + forceKey = "force" +) + +// getVolumeFromID gets the rbd image details from the volumeID. +// TODO: move this to controllerserver.go and reuse it wherever its applicable. +func (cs *ControllerServer) getVolumeFromID(ctx context.Context, volumeID string, secrets map[string]string) (*rbdVolume, *util.Credentials, error) { + // validate the volume ID + cr, err := util.NewUserCredentials(secrets) + if err != nil { + return nil, nil, status.Error(codes.Internal, err.Error()) + } + + if volumeID == "" { + return nil, cr, status.Error(codes.InvalidArgument, "empty volume ID in request") + } + + if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired { + util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, cr, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + + var rbdVol = &rbdVolume{} + rbdVol, err = genVolFromVolID(ctx, volumeID, cr, secrets) + if err != nil { + switch { + case errors.Is(err, ErrImageNotFound): + err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) + case errors.Is(err, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + default: + err = status.Errorf(codes.Internal, err.Error()) + } + } + return rbdVol, cr, err +} + +// getForceOption extracts the force option from the GRPC request parameters. +// If not set, the default will be set to false. +func getForceOption(ctx context.Context, parameters map[string]string) (bool, error) { + val, ok := parameters[forceKey] + if !ok { + util.WarningLog(ctx, "%s is not set in parameters, setting to default (%v)", forceKey, false) + return false, nil + } + force, err := strconv.ParseBool(val) + if err != nil { + return false, status.Errorf(codes.Internal, err.Error()) + } + return force, nil +} + +// getMirroringMode gets the mirroring mode from the input GRPC request parameters. +// mirroringMode is the key to check the mode in the parameters. +func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd.ImageMirrorMode, error) { + val, ok := parameters[imageMirroringKey] + if !ok { + util.WarningLog(ctx, "%s is not set in parameters, setting to mirroringMode to default (%s)", imageMirroringKey, imageMirrorModeSnapshot) + return librbd.ImageMirrorModeSnapshot, nil + } + + var mirroringMode librbd.ImageMirrorMode + switch imageMirroringMode(val) { + case imageMirrorModeSnapshot: + mirroringMode = librbd.ImageMirrorModeSnapshot + default: + return mirroringMode, status.Errorf(codes.InvalidArgument, "%s %s not supported", imageMirroringKey, val) + } + return mirroringMode, nil +} + +// cleanup performs below resource cleanup operations. +func (cs *ControllerServer) cleanup(rbdVol *rbdVolume, cr *util.Credentials) { + if cr != nil { + // destroy the credential file + cr.DeleteCredentials() + } + if rbdVol != nil { + // release the volume lock + cs.VolumeLocks.Release(rbdVol.VolID) + // destroy the cluster connection + rbdVol.Destroy() + } +} + +// EnableVolumeReplication extracts the RBD volume information from the +// volumeID, If the image is present it will enable the mirroring based on the +// user provided information. +// TODO: create new Replication controller struct for the replication operations. +func (cs *ControllerServer) EnableVolumeReplication(ctx context.Context, + req *replication.EnableVolumeReplicationRequest, +) (*replication.EnableVolumeReplicationResponse, error) { + rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) + defer cs.cleanup(rbdVol, cr) + if err != nil { + return nil, err + } + // extract the mirroring mode + mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) + if err != nil { + return nil, err + } + + mirroringInfo, err := rbdVol.getImageMirroringInfo() + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + + if mirroringInfo.State != librbd.MirrorImageEnabled { + err = rbdVol.enableImageMirroring(mirroringMode) + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + } + return &replication.EnableVolumeReplicationResponse{}, nil +} + +// DisableVolumeReplication extracts the RBD volume information from the +// volumeID, If the image is present and the mirroring is enabled on the RBD +// image it will disable the mirroring. +func (cs *ControllerServer) DisableVolumeReplication(ctx context.Context, + req *replication.DisableVolumeReplicationRequest, +) (*replication.DisableVolumeReplicationResponse, error) { + rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) + defer cs.cleanup(rbdVol, cr) + if err != nil { + return nil, err + } + + // extract the force option + force, err := getForceOption(ctx, req.GetParameters()) + if err != nil { + return nil, err + } + + mirroringInfo, err := rbdVol.getImageMirroringInfo() + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + + switch mirroringInfo.State { + case librbd.MirrorImageEnabled: + if !force && !mirroringInfo.Primary { + return nil, status.Error(codes.InvalidArgument, "image is in non-primary state") + } + err = rbdVol.disableImageMirroring(force) + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + // TODO: check do we need to return abort after disabling the mirroring + return &replication.DisableVolumeReplicationResponse{}, nil + case librbd.MirrorImageDisabled: + // image is already in disabled state + default: + // TODO: use string instead of int for returning valid error message + return nil, status.Errorf(codes.InvalidArgument, "image is in %d Mode", mirroringInfo.State) + } + + return &replication.DisableVolumeReplicationResponse{}, nil +} + +// PromoteVolume extracts the RBD volume information from the volumeID, If the +// image is present, mirroring is enabled and the image is in demoted state it +// will promote the volume as primary. +// If the image is already primary it will return success. +func (cs *ControllerServer) PromoteVolume(ctx context.Context, + req *replication.PromoteVolumeRequest, +) (*replication.PromoteVolumeResponse, error) { + rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) + defer cs.cleanup(rbdVol, cr) + if err != nil { + return nil, err + } + + // extract the force option + force, err := getForceOption(ctx, req.GetParameters()) + if err != nil { + return nil, err + } + + mirroringInfo, err := rbdVol.getImageMirroringInfo() + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + + if mirroringInfo.State != librbd.MirrorImageEnabled { + return nil, status.Errorf(codes.InvalidArgument, "mirroring is not enabled on %s, image is in %d Mode", rbdVol.VolID, mirroringInfo.State) + } + + // promote secondary to primary + if !mirroringInfo.Primary { + err = rbdVol.promoteImage(force) + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + } + + return &replication.PromoteVolumeResponse{}, nil +} + +// DemoteVolume extracts the RBD volume information from the +// volumeID, If the image is present, mirroring is enabled and the +// image is in promoted state it will demote the volume as secondary. +// If the image is already secondary it will return success. +func (cs *ControllerServer) DemoteVolume(ctx context.Context, + req *replication.DemoteVolumeRequest, +) (*replication.DemoteVolumeResponse, error) { + rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) + defer cs.cleanup(rbdVol, cr) + if err != nil { + return nil, err + } + + mirroringInfo, err := rbdVol.getImageMirroringInfo() + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + + if mirroringInfo.State != librbd.MirrorImageEnabled { + return nil, status.Errorf(codes.InvalidArgument, "mirroring is not enabled on %s, image is in %d Mode", rbdVol.VolID, mirroringInfo.State) + } + + // demote image to secondary + if mirroringInfo.Primary { + err = rbdVol.demoteImage() + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + } + return &replication.DemoteVolumeResponse{}, nil +} + +// ResyncVolume extracts the RBD volume information from the volumeID, If the +// image is present, mirroring is enabled and the image is in demoted state. +// If yes it will resync the image to correct the split-brain. +func (cs *ControllerServer) ResyncVolume(ctx context.Context, + req *replication.ResyncVolumeRequest, +) (*replication.ResyncVolumeResponse, error) { + rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) + defer cs.cleanup(rbdVol, cr) + if err != nil { + return nil, err + } + + mirroringInfo, err := rbdVol.getImageMirroringInfo() + if err != nil { + // TODO: check if the image is not found return valid error message + // in case of Resync the image will get deleted and gets recreated and + // it takes time for this operation + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + + if mirroringInfo.State != librbd.MirrorImageEnabled { + return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") + } + + // return error if the image is still primary + if mirroringInfo.Primary { + return nil, status.Error(codes.InvalidArgument, "image is in primary state") + } + + err = rbdVol.resyncImage() + if err != nil { + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + // TODO: check the image state and return its ready to use or not + mirrorStatus, err := rbdVol.getImageMirroingStatus() + if err != nil { + // the image gets recreated after issuing resync in that case return + // volume as not ready. + if errors.Is(err, ErrImageNotFound) { + resp := &replication.ResyncVolumeResponse{ + Ready: false, + } + return resp, nil + } + util.ErrorLog(ctx, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + ready := false + state := imageMirroringState(mirrorStatus.State) + if state == upAndStopped || state == upAndReplaying { + ready = true + } + util.UsefulLog(ctx, "image mirroring state=%s, description=%s and lastUpdate=%s", mirrorStatus.State, mirrorStatus.Description, mirrorStatus.LastUpdate) + resp := &replication.ResyncVolumeResponse{ + Ready: ready, + } + return resp, nil +}