mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-05 11:39:29 +00:00
68bd44beba
In the case of Disaster Recovery failover, the user expected to create the static PVC's. We have planned not to go with the PVC name and namespace for many reasons (as in kubernetes it's planned to support PVC transfer to a new namespace with a different name and with new features coming in like data populator etc). For now, we are planning to go with static PVC's to support async mirroring. During Async mirroring only the RBD images are mirrored to the secondary site, and when the user creates the static PVC's on the failover we need to regenerate the omap data. The volumeHandler in PV spec is an encoded string which contains clusterID and poolID and image UUID, The clusterID and poolID won't remain same on both the clusters, for that cephcsi need to generate the new volume handler and its to create a mapping between new volume handler and old volume handler with that whenever cephcsi gets csi requests it check if the mapping exists it will pull the new volume handler and continues other operations. The new controller watches for the PVs created, It checks if the omap exists if it doesn't it will regenerate the entire omap data. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
168 lines
5.2 KiB
Go
168 lines
5.2 KiB
Go
/*
|
|
Copyright 2020 The Ceph-CSI Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
package persistentvolume
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
ctrl "github.com/ceph/ceph-csi/internal/controller"
|
|
"github.com/ceph/ceph-csi/internal/rbd"
|
|
"github.com/ceph/ceph-csi/internal/util"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
"sigs.k8s.io/controller-runtime/pkg/source"
|
|
)
|
|
|
|
// ReconcilePersistentVolume reconciles a PersistentVolume object.
|
|
type ReconcilePersistentVolume struct {
|
|
client client.Client
|
|
config ctrl.Config
|
|
}
|
|
|
|
var _ reconcile.Reconciler = &ReconcilePersistentVolume{}
|
|
var _ ctrl.ContollerManager = &ReconcilePersistentVolume{}
|
|
|
|
// Init will add the ReconcilePersistentVolume to the list.
|
|
func Init() {
|
|
// add ReconcilePersistentVolume to the list
|
|
ctrl.ControllerList = append(ctrl.ControllerList, ReconcilePersistentVolume{})
|
|
}
|
|
|
|
// Add adds the newPVReconciler.
|
|
func (r ReconcilePersistentVolume) Add(mgr manager.Manager, config ctrl.Config) error {
|
|
return add(mgr, newPVReconciler(mgr, config))
|
|
}
|
|
|
|
// newReconciler returns a ReconcilePersistentVolume.
|
|
func newPVReconciler(mgr manager.Manager, config ctrl.Config) reconcile.Reconciler {
|
|
r := &ReconcilePersistentVolume{
|
|
client: mgr.GetClient(),
|
|
config: config,
|
|
}
|
|
return r
|
|
}
|
|
|
|
func add(mgr manager.Manager, r reconcile.Reconciler) error {
|
|
// Create a new controller
|
|
c, err := controller.New("persistentvolume-controller", mgr, controller.Options{MaxConcurrentReconciles: 1, Reconciler: r})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Watch for changes to PersistentVolumes
|
|
err = c.Watch(&source.Kind{Type: &corev1.PersistentVolume{}}, &handler.EnqueueRequestForObject{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReconcilePersistentVolume) getCredentials(name, namespace string) (map[string]string, error) {
|
|
secret := &corev1.Secret{}
|
|
err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, secret)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", name, namespace, err)
|
|
}
|
|
|
|
credentials := map[string]string{}
|
|
for key, value := range secret.Data {
|
|
credentials[key] = string(value)
|
|
}
|
|
return credentials, nil
|
|
}
|
|
|
|
// reconcilePV will extract the image details from the pv spec and regenerates
|
|
// the omap data.
|
|
func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error {
|
|
pv, ok := obj.(*corev1.PersistentVolume)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == r.config.DriverName {
|
|
pool := pv.Spec.CSI.VolumeAttributes["pool"]
|
|
journalPool := pv.Spec.CSI.VolumeAttributes["journalPool"]
|
|
requestName := pv.Name
|
|
imageName := pv.Spec.CSI.VolumeAttributes["imageName"]
|
|
volumeHandler := pv.Spec.CSI.VolumeHandle
|
|
secretName := ""
|
|
secretNamespace := ""
|
|
|
|
if pv.Spec.CSI.ControllerExpandSecretRef != nil {
|
|
secretName = pv.Spec.CSI.ControllerExpandSecretRef.Name
|
|
secretNamespace = pv.Spec.CSI.ControllerExpandSecretRef.Namespace
|
|
} else if pv.Spec.CSI.NodeStageSecretRef != nil {
|
|
secretName = pv.Spec.CSI.NodeStageSecretRef.Name
|
|
secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace
|
|
}
|
|
if secretName == "" || secretNamespace == "" {
|
|
errStr := "secretname or secret namespace is empty"
|
|
util.ErrorLogMsg(errStr)
|
|
return errors.New(errStr)
|
|
}
|
|
|
|
secrets, err := r.getCredentials(secretName, secretNamespace)
|
|
if err != nil {
|
|
util.ErrorLogMsg("failed to get secrets %s", err)
|
|
return err
|
|
}
|
|
cr, err := util.NewUserCredentials(secrets)
|
|
if err != nil {
|
|
util.ErrorLogMsg("failed to get user credentials %s", err)
|
|
return err
|
|
}
|
|
defer cr.DeleteCredentials()
|
|
err = rbd.RegenerateJournal(imageName, volumeHandler, pool, journalPool, requestName, cr)
|
|
if err != nil {
|
|
util.ErrorLogMsg("failed to regenerate journal %s", err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Reconcile reconciles the PersitentVolume object and creates a new omap entries
|
|
// for the volume.
|
|
func (r *ReconcilePersistentVolume) Reconcile(request reconcile.Request) (reconcile.Result, error) {
|
|
pv := &corev1.PersistentVolume{}
|
|
err := r.client.Get(context.TODO(), request.NamespacedName, pv)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return reconcile.Result{}, nil
|
|
}
|
|
return reconcile.Result{}, err
|
|
}
|
|
// Check if the object is under deletion
|
|
if !pv.GetDeletionTimestamp().IsZero() {
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
err = r.reconcilePV(pv)
|
|
if err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
return reconcile.Result{}, nil
|
|
}
|