rbd: add new controller to regenerate omap data

In the case of Disaster Recovery failover, the
user expected to create the static PVC's. We have
planned not to go with the PVC name and namespace
for many reasons (as in kubernetes it's planned to
support PVC transfer to a new namespace with a
different name and with new features coming in
like data populator etc). For now, we are
planning to go with static PVC's to support
async mirroring.

During Async mirroring only the RBD images are
mirrored to the secondary site, and when the
user creates the static PVC's on the failover
we need to regenerate the omap data. The
volumeHandler in PV spec is an encoded string
which contains clusterID and poolID and image UUID,
The clusterID and poolID won't remain same on both
the clusters, for that cephcsi need to generate the
new volume handler and its to create a mapping
between new volume handler and old volume handler
with that whenever cephcsi gets csi requests it
check if the mapping exists it will pull the new
volume handler and continues other operations.

The new controller watches for the PVs created,
It checks if the omap exists if it doesn't it
will regenerate the entire omap data.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2020-10-21 18:19:45 +05:30 committed by mergify[bot]
parent 5af3fe5deb
commit 68bd44beba
5 changed files with 471 additions and 21 deletions

View File

@ -0,0 +1,78 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"github.com/ceph/ceph-csi/internal/util"
clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
// ContollerManager is the interface that will wrap Add function.
// The New controllers which gets added, as to implement Add function to get
// started by the manager.
type ContollerManager interface {
Add(manager.Manager, Config) error
}
// Config holds the drivername and namespace name.
type Config struct {
DriverName string
Namespace string
}
// ControllerList holds the list of managers need to be started.
var ControllerList []ContollerManager
// addToManager calls the registered managers Add method.
func addToManager(mgr manager.Manager, config Config) error {
for _, c := range ControllerList {
err := c.Add(mgr, config)
if err != nil {
return err
}
}
return nil
}
// Start will start all the registered managers.
func Start(config Config) error {
electionID := config.DriverName + "-" + config.Namespace
opts := manager.Options{
LeaderElection: true,
// disable metrics
MetricsBindAddress: "0",
LeaderElectionNamespace: config.Namespace,
LeaderElectionID: electionID,
}
mgr, err := manager.New(clientConfig.GetConfigOrDie(), opts)
if err != nil {
util.ErrorLogMsg("failed to create manager %s", err)
return err
}
err = addToManager(mgr, config)
if err != nil {
util.ErrorLogMsg("failed to add manager %s", err)
return err
}
err = mgr.Start(signals.SetupSignalHandler())
if err != nil {
util.ErrorLogMsg("failed to start manager %s", err)
}
return err
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"context"
"errors"
"fmt"
ctrl "github.com/ceph/ceph-csi/internal/controller"
"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/util"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// ReconcilePersistentVolume reconciles a PersistentVolume object.
type ReconcilePersistentVolume struct {
client client.Client
config ctrl.Config
}
var _ reconcile.Reconciler = &ReconcilePersistentVolume{}
var _ ctrl.ContollerManager = &ReconcilePersistentVolume{}
// Init will add the ReconcilePersistentVolume to the list.
func Init() {
// add ReconcilePersistentVolume to the list
ctrl.ControllerList = append(ctrl.ControllerList, ReconcilePersistentVolume{})
}
// Add adds the newPVReconciler.
func (r ReconcilePersistentVolume) Add(mgr manager.Manager, config ctrl.Config) error {
return add(mgr, newPVReconciler(mgr, config))
}
// newReconciler returns a ReconcilePersistentVolume.
func newPVReconciler(mgr manager.Manager, config ctrl.Config) reconcile.Reconciler {
r := &ReconcilePersistentVolume{
client: mgr.GetClient(),
config: config,
}
return r
}
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New("persistentvolume-controller", mgr, controller.Options{MaxConcurrentReconciles: 1, Reconciler: r})
if err != nil {
return err
}
// Watch for changes to PersistentVolumes
err = c.Watch(&source.Kind{Type: &corev1.PersistentVolume{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}
return nil
}
func (r *ReconcilePersistentVolume) getCredentials(name, namespace string) (map[string]string, error) {
secret := &corev1.Secret{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, secret)
if err != nil {
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", name, namespace, err)
}
credentials := map[string]string{}
for key, value := range secret.Data {
credentials[key] = string(value)
}
return credentials, nil
}
// reconcilePV will extract the image details from the pv spec and regenerates
// the omap data.
func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error {
pv, ok := obj.(*corev1.PersistentVolume)
if !ok {
return nil
}
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == r.config.DriverName {
pool := pv.Spec.CSI.VolumeAttributes["pool"]
journalPool := pv.Spec.CSI.VolumeAttributes["journalPool"]
requestName := pv.Name
imageName := pv.Spec.CSI.VolumeAttributes["imageName"]
volumeHandler := pv.Spec.CSI.VolumeHandle
secretName := ""
secretNamespace := ""
if pv.Spec.CSI.ControllerExpandSecretRef != nil {
secretName = pv.Spec.CSI.ControllerExpandSecretRef.Name
secretNamespace = pv.Spec.CSI.ControllerExpandSecretRef.Namespace
} else if pv.Spec.CSI.NodeStageSecretRef != nil {
secretName = pv.Spec.CSI.NodeStageSecretRef.Name
secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace
}
if secretName == "" || secretNamespace == "" {
errStr := "secretname or secret namespace is empty"
util.ErrorLogMsg(errStr)
return errors.New(errStr)
}
secrets, err := r.getCredentials(secretName, secretNamespace)
if err != nil {
util.ErrorLogMsg("failed to get secrets %s", err)
return err
}
cr, err := util.NewUserCredentials(secrets)
if err != nil {
util.ErrorLogMsg("failed to get user credentials %s", err)
return err
}
defer cr.DeleteCredentials()
err = rbd.RegenerateJournal(imageName, volumeHandler, pool, journalPool, requestName, cr)
if err != nil {
util.ErrorLogMsg("failed to regenerate journal %s", err)
return err
}
}
return nil
}
// Reconcile reconciles the PersitentVolume object and creates a new omap entries
// for the volume.
func (r *ReconcilePersistentVolume) Reconcile(request reconcile.Request) (reconcile.Result, error) {
pv := &corev1.PersistentVolume{}
err := r.client.Get(context.TODO(), request.NamespacedName, pv)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Check if the object is under deletion
if !pv.GetDeletionTimestamp().IsZero() {
return reconcile.Result{}, nil
}
err = r.reconcilePV(pv)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

View File

@ -434,9 +434,12 @@ func (conn *Connection) UndoReservation(ctx context.Context,
return err return err
} }
// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated <uuid>. // reserveOMapName creates an omap with passed in oMapNamePrefix and a
// It ensures generated omap name does not already exist and if conflicts are detected, a set // generated <uuid>. If the passed volUUID is not empty it will use it instead
// number of retires with newer uuids are attempted before returning an error. // of generating its own UUID and it will return an error immediately if omap
// already exists.if the passed volUUID is empty It ensures generated omap name
// does not already exist and if conflicts are detected, a set number of
// retires with newer uuids are attempted before returning an error.
func reserveOMapName(ctx context.Context, monitors string, cr *util.Credentials, pool, namespace, oMapNamePrefix, volUUID string) (string, error) { func reserveOMapName(ctx context.Context, monitors string, cr *util.Credentials, pool, namespace, oMapNamePrefix, volUUID string) (string, error) {
var iterUUID string var iterUUID string
@ -489,8 +492,8 @@ Input arguments:
- namePrefix: Prefix to use when generating the image/subvolume name (suffix is an auto-genetated UUID) - namePrefix: Prefix to use when generating the image/subvolume name (suffix is an auto-genetated UUID)
- parentName: Name of the parent image/subvolume if reservation is for a snapshot (optional) - parentName: Name of the parent image/subvolume if reservation is for a snapshot (optional)
- kmsConf: Name of the key management service used to encrypt the image (optional) - kmsConf: Name of the key management service used to encrypt the image (optional)
- volUUID: UUID need to be reserved instead of auto-generating one (this is - volUUID: UUID need to be reserved instead of auto-generating one (this is
useful for mirroring and metro-DR) useful for mirroring and metro-DR)
Return values: Return values:
- string: Contains the UUID that was reserved for the passed in reqName - string: Contains the UUID that was reserved for the passed in reqName
@ -689,3 +692,43 @@ func (conn *Connection) Destroy() {
conn.monitors = "" conn.monitors = ""
conn.cr = nil conn.cr = nil
} }
// CheckNewUUIDMapping checks is there any UUID mapping between old
// volumeHandle and the newly generated volumeHandle.
func (conn *Connection) CheckNewUUIDMapping(ctx context.Context,
journalPool, volumeHandle string) (string, error) {
var cj = conn.config
// check if request name is already part of the directory omap
fetchKeys := []string{
cj.csiNameKeyPrefix + volumeHandle,
}
values, err := getOMapValues(
ctx, conn, journalPool, cj.namespace, cj.csiDirectory,
cj.commonPrefix, fetchKeys)
if err != nil {
if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) {
// pool or omap (oid) was not present
// stop processing but without an error for no reservation exists
return "", nil
}
return "", err
}
return values[cj.csiNameKeyPrefix+volumeHandle], nil
}
// ReserveNewUUIDMapping creates the omap mapping between the oldVolumeHandle
// and the newVolumeHandle. Incase of Async Mirroring the PV is statically
// created it will have oldVolumeHandle,the volumeHandle is composed of
// clusterID,PoolID etc. as the poolID and clusterID might be different at the
// secondary cluster cephcsi will generate the new mapping and keep it for
// internal reference.
func (conn *Connection) ReserveNewUUIDMapping(ctx context.Context,
journalPool, oldVolumeHandle, newVolumeHandle string) error {
var cj = conn.config
setKeys := map[string]string{
cj.csiNameKeyPrefix + oldVolumeHandle: newVolumeHandle,
}
return setOMapKeys(ctx, conn, journalPool, cj.namespace, cj.csiDirectory, setKeys)
}

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
) )
@ -466,3 +467,150 @@ func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent
return err return err
} }
// RegenerateJournal regenerates the omap data for the static volumes, the
// input parameters imageName, volumeID, pool, journalPool, requestName will be
// present in the PV.Spec.CSI object based on that we can regenerate the
// complete omap mapping between imageName and volumeID.
// RegenerateJournal performs below operations
// Extract information from volumeID
// Get pool ID from pool name
// Extract uuid from volumeID
// Reserve omap data
// Generate new volume Handler
// Create old volumeHandler to new handler mapping
// The volume handler wont remain same as its contains poolID,clusterID etc
// which are not same across clusters.
func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName string, cr *util.Credentials) error {
ctx := context.Background()
var (
options map[string]string
vi util.CSIIdentifier
rbdVol *rbdVolume
)
options = make(map[string]string)
rbdVol = &rbdVolume{VolID: volumeID}
err := vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
return fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
ErrInvalidVolID, err, rbdVol.VolID)
}
// TODO check clusterID mapping exists
rbdVol.ClusterID = vi.ClusterID
options["clusterID"] = rbdVol.ClusterID
rbdVol.Monitors, _, err = util.GetMonsAndClusterID(options)
if err != nil {
util.ErrorLog(ctx, "failed getting mons (%s)", err)
return err
}
rbdVol.Pool = pool
err = rbdVol.Connect(cr)
if err != nil {
return err
}
rbdVol.JournalPool = journalPool
if rbdVol.JournalPool == "" {
rbdVol.JournalPool = rbdVol.Pool
}
volJournal = journal.NewCSIVolumeJournal("default")
j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr)
if err != nil {
return err
}
defer j.Destroy()
journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr)
if err != nil {
return err
}
rbdVol.RequestName = requestName
// TODO add Nameprefix also
kmsID := ""
imageData, err := j.CheckReservation(
ctx, rbdVol.JournalPool, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
if err != nil {
return err
}
if imageData != nil {
rbdVol.ReservedID = imageData.ImageUUID
rbdVol.ImageID = imageData.ImageAttributes.ImageID
if rbdVol.ImageID == "" {
err = rbdVol.getImageID()
if err != nil {
util.ErrorLog(ctx, "failed to get image id %s: %v", rbdVol, err)
return err
}
err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID)
if err != nil {
util.ErrorLog(ctx, "failed to store volume id %s: %v", rbdVol, err)
return err
}
}
err = rbdVol.addNewUUIDMapping(ctx, imagePoolID, j)
if err != nil {
util.ErrorLog(ctx, "failed to add UUID mapping %s: %v", rbdVol, err)
return err
}
}
rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName(
ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID,
rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID, vi.ObjectUUID)
if err != nil {
return err
}
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool,
rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion)
if err != nil {
return err
}
util.DebugLog(ctx, "re-generated Volume ID (%s) and image name (%s) for request name (%s)",
rbdVol.VolID, rbdVol.RbdImageName, rbdVol.RequestName)
if rbdVol.ImageID == "" {
err = rbdVol.getImageID()
if err != nil {
util.ErrorLog(ctx, "failed to get image id %s: %v", rbdVol, err)
return err
}
err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID)
if err != nil {
util.ErrorLog(ctx, "failed to store volume id %s: %v", rbdVol, err)
return err
}
}
if volumeID != rbdVol.VolID {
return j.ReserveNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID, rbdVol.VolID)
}
return nil
}
// addNewUUIDMapping creates the mapping between two volumeID.
func (rv *rbdVolume) addNewUUIDMapping(ctx context.Context, imagePoolID int64, j *journal.Connection) error {
var err error
volID := ""
id, err := j.CheckNewUUIDMapping(ctx, rv.JournalPool, rv.VolID)
if err == nil && id == "" {
volID, err = util.GenerateVolID(ctx, rv.Monitors, rv.conn.Creds, imagePoolID, rv.Pool,
rv.ClusterID, rv.ReservedID, volIDVersion)
if err != nil {
return err
}
if rv.VolID == volID {
return nil
}
return j.ReserveNewUUIDMapping(ctx, rv.JournalPool, rv.VolID, volID)
}
return err
}

View File

@ -663,6 +663,7 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
options map[string]string options map[string]string
vi util.CSIIdentifier vi util.CSIIdentifier
rbdVol *rbdVolume rbdVol *rbdVolume
err error
) )
options = make(map[string]string) options = make(map[string]string)
@ -670,12 +671,14 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
// Mounter, MultiNodeWritable // Mounter, MultiNodeWritable
rbdVol = &rbdVolume{VolID: volumeID} rbdVol = &rbdVolume{VolID: volumeID}
err := vi.DecomposeCSIID(rbdVol.VolID) err = vi.DecomposeCSIID(rbdVol.VolID)
if err != nil { if err != nil {
return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)", return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
ErrInvalidVolID, err, rbdVol.VolID) ErrInvalidVolID, err, rbdVol.VolID)
} }
// TODO check clusterID mapping exists
rbdVol.ClusterID = vi.ClusterID rbdVol.ClusterID = vi.ClusterID
options["clusterID"] = rbdVol.ClusterID options["clusterID"] = rbdVol.ClusterID
@ -685,17 +688,6 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
return rbdVol, err return rbdVol, err
} }
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID)
if err != nil {
return rbdVol, err
}
err = rbdVol.Connect(cr)
if err != nil {
return rbdVol, err
}
rbdVol.JournalPool = rbdVol.Pool
rbdVol.RadosNamespace, err = util.RadosNamespace(util.CsiConfigFile, rbdVol.ClusterID) rbdVol.RadosNamespace, err = util.RadosNamespace(util.CsiConfigFile, rbdVol.ClusterID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -707,6 +699,31 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
} }
defer j.Destroy() defer j.Destroy()
// check is there any volumeID mapping exists.
id, err := j.CheckNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID)
if err != nil {
return rbdVol, fmt.Errorf("failed to get volume id %s mapping %w",
volumeID, err)
}
if id != "" {
rbdVol.VolID = id
err = vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
ErrInvalidVolID, err, rbdVol.VolID)
}
}
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID)
if err != nil {
return rbdVol, err
}
err = rbdVol.Connect(cr)
if err != nil {
return rbdVol, err
}
rbdVol.JournalPool = rbdVol.Pool
imageAttributes, err := j.GetImageAttributes( imageAttributes, err := j.GetImageAttributes(
ctx, rbdVol.Pool, vi.ObjectUUID, false) ctx, rbdVol.Pool, vi.ObjectUUID, false)
if err != nil { if err != nil {
@ -982,10 +999,7 @@ type imageInfo struct {
// parentInfo spec for parent volume info. // parentInfo spec for parent volume info.
type mirroring struct { type mirroring struct {
Mode string `json:"mode"` Primary bool `json:"primary"`
State string `json:"state"`
GlobalID string `json:"global_id"`
Primary bool `json:"primary"`
} }
// updateVolWithImageInfo updates provided rbdVolume with information from on-disk data // updateVolWithImageInfo updates provided rbdVolume with information from on-disk data