mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-12-18 11:00:25 +00:00
rbd:store/Read volumeID in/from PV annotation
In the case of the Async DR, the volumeID will not be the same if the clusterID or the PoolID is different, With Earlier implementation, it is expected that the new volumeID mapping is stored in the rados omap pool. In the case of the ControllerExpand or the DeleteVolume Request, the only volumeID will be sent it's not possible to find the corresponding poolID in the new cluster. With This Change, it works as below The csi-rbdplugin-controller will watch for the PV objects, when there are any PV objects created it will check the omap already exists, If the omap doesn't exist it will generate the new volumeID and it checks for the volumeID mapping entry in the PV annotation, if the mapping does not exist, it will add the new entry to the PV annotation. The cephcsi will check for the PV annotations if the omap does not exist if the mapping exists in the PV annotation, it will use the new volumeID for further operations. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
parent
9aea701bd9
commit
0f8813d89f
@ -75,12 +75,12 @@ To solve this problem, We will have a new controller(rbdplugin controller)
|
|||||||
running as part of provisioner pod which watches for the PV objects. When a PV
|
running as part of provisioner pod which watches for the PV objects. When a PV
|
||||||
is created it will extract the required information from the PV spec and it
|
is created it will extract the required information from the PV spec and it
|
||||||
will regenerate the OMAP data and also it will generate a new VolumeHandle
|
will regenerate the OMAP data and also it will generate a new VolumeHandle
|
||||||
(`newclusterID-newpoolID-volumeuniqueID`) and it creates an OMAP object for
|
(`newclusterID-newpoolID-volumeuniqueID`) and it adds a PV annotation
|
||||||
mapping between old VolumeHandle and new VolumeHandle. Whenever Ceph-CSI gets a
|
`csi.ceph.io/volume-handle` for mapping between old VolumeHandle and new
|
||||||
RPC request with older VolumeHandle, it will check if any new VolumeHandle
|
VolumeHandle. Whenever Ceph-CSI gets a RPC request with older VolumeHandle, it
|
||||||
exists for the old VolumeHandle. If yes, it uses the new VolumeHandle for
|
will check if any new VolumeHandle exists for the old VolumeHandle. If yes, it
|
||||||
internal operations (to get pool name, Ceph monitor details from the ClusterID
|
uses the new VolumeHandle for internal operations (to get pool name, Ceph
|
||||||
etc).
|
monitor details from the ClusterID etc).
|
||||||
|
|
||||||
Currently, We are making use of watchers in node stage request to make sure
|
Currently, We are making use of watchers in node stage request to make sure
|
||||||
ReadWriteOnce (RWO) PVC is mounted on a single node at a given point in time.
|
ReadWriteOnce (RWO) PVC is mounted on a single node at a given point in time.
|
||||||
|
@ -122,6 +122,24 @@ func checkStaticVolume(pv *corev1.PersistentVolume) (bool, error) {
|
|||||||
return static, nil
|
return static, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// storeVolumeIDInPV stores the new volumeID in PV object.
|
||||||
|
func (r ReconcilePersistentVolume) storeVolumeIDInPV(pv *corev1.PersistentVolume, newVolumeID string) error {
|
||||||
|
if v, ok := pv.Annotations[rbd.PVVolumeHandleAnnotationKey]; ok {
|
||||||
|
if v == newVolumeID {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pv.Annotations == nil {
|
||||||
|
pv.Annotations = make(map[string]string)
|
||||||
|
}
|
||||||
|
if pv.Labels == nil {
|
||||||
|
pv.Labels = make(map[string]string)
|
||||||
|
}
|
||||||
|
pv.Labels[rbd.PVReplicatedLabelKey] = rbd.PVReplicatedLabelValue
|
||||||
|
pv.Annotations[rbd.PVVolumeHandleAnnotationKey] = newVolumeID
|
||||||
|
return r.client.Update(context.TODO(), pv)
|
||||||
|
}
|
||||||
|
|
||||||
// reconcilePV will extract the image details from the pv spec and regenerates
|
// reconcilePV will extract the image details from the pv spec and regenerates
|
||||||
// the omap data.
|
// the omap data.
|
||||||
func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error {
|
func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error {
|
||||||
@ -163,11 +181,18 @@ func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error {
|
|||||||
}
|
}
|
||||||
defer cr.DeleteCredentials()
|
defer cr.DeleteCredentials()
|
||||||
|
|
||||||
err = rbd.RegenerateJournal(imageName, volumeHandler, pool, journalPool, requestName, cr)
|
rbdVolID, err := rbd.RegenerateJournal(imageName, volumeHandler, pool, journalPool, requestName, cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.ErrorLogMsg("failed to regenerate journal %s", err)
|
util.ErrorLogMsg("failed to regenerate journal %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if rbdVolID != volumeHandler {
|
||||||
|
err = r.storeVolumeIDInPV(pv, rbdVolID)
|
||||||
|
if err != nil {
|
||||||
|
util.ErrorLogMsg("failed to store volumeID in PV %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,6 +25,15 @@ import (
|
|||||||
"github.com/ceph/ceph-csi/internal/util"
|
"github.com/ceph/ceph-csi/internal/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// PVVolumeHandleAnnotationKey is the annotation key set on the PV object.
|
||||||
|
PVVolumeHandleAnnotationKey = "csi.ceph.io/volume-handle"
|
||||||
|
// PVReplicatedLabelKey is the label key set on PV object.
|
||||||
|
PVReplicatedLabelKey = "csi.ceph.io/replicated-volume"
|
||||||
|
// PVReplicatedLabelValue is the label value set on PV object.
|
||||||
|
PVReplicatedLabelValue = "volume-handle-detected"
|
||||||
|
)
|
||||||
|
|
||||||
func validateNonEmptyField(field, fieldName, structName string) error {
|
func validateNonEmptyField(field, fieldName, structName string) error {
|
||||||
if field == "" {
|
if field == "" {
|
||||||
return fmt.Errorf("value '%s' in '%s' structure cannot be empty", fieldName, structName)
|
return fmt.Errorf("value '%s' in '%s' structure cannot be empty", fieldName, structName)
|
||||||
@ -479,10 +488,9 @@ func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent
|
|||||||
// Extract uuid from volumeID
|
// Extract uuid from volumeID
|
||||||
// Reserve omap data
|
// Reserve omap data
|
||||||
// Generate new volume Handler
|
// Generate new volume Handler
|
||||||
// Create old volumeHandler to new handler mapping
|
|
||||||
// The volume handler won't remain same as its contains poolID,clusterID etc
|
// The volume handler won't remain same as its contains poolID,clusterID etc
|
||||||
// which are not same across clusters.
|
// which are not same across clusters.
|
||||||
func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName string, cr *util.Credentials) error {
|
func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName string, cr *util.Credentials) (string, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
var (
|
var (
|
||||||
options map[string]string
|
options map[string]string
|
||||||
@ -496,7 +504,7 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
|
|||||||
|
|
||||||
err := vi.DecomposeCSIID(rbdVol.VolID)
|
err := vi.DecomposeCSIID(rbdVol.VolID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
|
return "", fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
|
||||||
ErrInvalidVolID, err, rbdVol.VolID)
|
ErrInvalidVolID, err, rbdVol.VolID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -507,13 +515,13 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
|
|||||||
rbdVol.Monitors, _, err = util.GetMonsAndClusterID(options)
|
rbdVol.Monitors, _, err = util.GetMonsAndClusterID(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.ErrorLog(ctx, "failed getting mons (%s)", err)
|
util.ErrorLog(ctx, "failed getting mons (%s)", err)
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
rbdVol.Pool = pool
|
rbdVol.Pool = pool
|
||||||
err = rbdVol.Connect(cr)
|
err = rbdVol.Connect(cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
rbdVol.JournalPool = journalPool
|
rbdVol.JournalPool = journalPool
|
||||||
if rbdVol.JournalPool == "" {
|
if rbdVol.JournalPool == "" {
|
||||||
@ -522,13 +530,13 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
|
|||||||
volJournal = journal.NewCSIVolumeJournal("default")
|
volJournal = journal.NewCSIVolumeJournal("default")
|
||||||
j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr)
|
j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
defer j.Destroy()
|
defer j.Destroy()
|
||||||
|
|
||||||
journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr)
|
journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
rbdVol.RequestName = requestName
|
rbdVol.RequestName = requestName
|
||||||
@ -538,7 +546,7 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
|
|||||||
imageData, err := j.CheckReservation(
|
imageData, err := j.CheckReservation(
|
||||||
ctx, rbdVol.JournalPool, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
|
ctx, rbdVol.JournalPool, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
if imageData != nil {
|
if imageData != nil {
|
||||||
rbdVol.ReservedID = imageData.ImageUUID
|
rbdVol.ReservedID = imageData.ImageUUID
|
||||||
@ -547,23 +555,23 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
|
|||||||
if rbdVol.ImageID == "" {
|
if rbdVol.ImageID == "" {
|
||||||
err = rbdVol.storeImageID(ctx, j)
|
err = rbdVol.storeImageID(ctx, j)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = rbdVol.addNewUUIDMapping(ctx, imagePoolID, j)
|
|
||||||
if err != nil {
|
|
||||||
util.ErrorLog(ctx, "failed to add UUID mapping %s: %v", rbdVol, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// As the omap already exists for this image ID return nil.
|
// As the omap already exists for this image ID return nil.
|
||||||
return nil
|
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool,
|
||||||
|
rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return rbdVol.VolID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName(
|
rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName(
|
||||||
ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID,
|
ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID,
|
||||||
rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID, vi.ObjectUUID, rbdVol.Owner)
|
rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID, vi.ObjectUUID, rbdVol.Owner)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -578,7 +586,7 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
|
|||||||
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool,
|
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool,
|
||||||
rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion)
|
rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
util.DebugLog(ctx, "re-generated Volume ID (%s) and image name (%s) for request name (%s)",
|
util.DebugLog(ctx, "re-generated Volume ID (%s) and image name (%s) for request name (%s)",
|
||||||
@ -586,14 +594,11 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
|
|||||||
if rbdVol.ImageID == "" {
|
if rbdVol.ImageID == "" {
|
||||||
err = rbdVol.storeImageID(ctx, j)
|
err = rbdVol.storeImageID(ctx, j)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if volumeID != rbdVol.VolID {
|
return rbdVol.VolID, nil
|
||||||
return j.ReserveNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID, rbdVol.VolID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// storeImageID retrieves the image ID and stores it in OMAP.
|
// storeImageID retrieves the image ID and stores it in OMAP.
|
||||||
@ -610,23 +615,3 @@ func (rv *rbdVolume) storeImageID(ctx context.Context, j *journal.Connection) er
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addNewUUIDMapping creates the mapping between two volumeID.
|
|
||||||
func (rv *rbdVolume) addNewUUIDMapping(ctx context.Context, imagePoolID int64, j *journal.Connection) error {
|
|
||||||
var err error
|
|
||||||
volID := ""
|
|
||||||
|
|
||||||
id, err := j.CheckNewUUIDMapping(ctx, rv.JournalPool, rv.VolID)
|
|
||||||
if err == nil && id == "" {
|
|
||||||
volID, err = util.GenerateVolID(ctx, rv.Monitors, rv.conn.Creds, imagePoolID, rv.Pool,
|
|
||||||
rv.ClusterID, rv.ReservedID, volIDVersion)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if rv.VolID == volID {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return j.ReserveNewUUIDMapping(ctx, rv.JournalPool, rv.VolID, volID)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/golang/protobuf/ptypes"
|
"github.com/golang/protobuf/ptypes"
|
||||||
"github.com/golang/protobuf/ptypes/timestamp"
|
"github.com/golang/protobuf/ptypes/timestamp"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/cloud-provider/volume/helpers"
|
"k8s.io/cloud-provider/volume/helpers"
|
||||||
)
|
)
|
||||||
@ -777,9 +778,8 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// genVolFromVolID generates a rbdVolume structure from the provided identifier, updating
|
// generateVolumeFromVolumeID generates a rbdVolume structure from the provided identifier.
|
||||||
// the structure with elements from on-disk image metadata as well.
|
func generateVolumeFromVolumeID(ctx context.Context, volumeID string, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) {
|
||||||
func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) {
|
|
||||||
var (
|
var (
|
||||||
options map[string]string
|
options map[string]string
|
||||||
vi util.CSIIdentifier
|
vi util.CSIIdentifier
|
||||||
@ -821,20 +821,6 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
|
|||||||
}
|
}
|
||||||
defer j.Destroy()
|
defer j.Destroy()
|
||||||
|
|
||||||
// check is there any volumeID mapping exists.
|
|
||||||
id, err := j.CheckNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID)
|
|
||||||
if err != nil {
|
|
||||||
return rbdVol, fmt.Errorf("failed to get volume id %s mapping %w",
|
|
||||||
volumeID, err)
|
|
||||||
}
|
|
||||||
if id != "" {
|
|
||||||
rbdVol.VolID = id
|
|
||||||
err = vi.DecomposeCSIID(rbdVol.VolID)
|
|
||||||
if err != nil {
|
|
||||||
return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
|
|
||||||
ErrInvalidVolID, err, rbdVol.VolID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID)
|
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rbdVol, err
|
return rbdVol, err
|
||||||
@ -883,6 +869,38 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
|
|||||||
return rbdVol, err
|
return rbdVol, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// genVolFromVolID generates a rbdVolume structure from the provided identifier, updating
|
||||||
|
// the structure with elements from on-disk image metadata as well.
|
||||||
|
func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) {
|
||||||
|
vol, err := generateVolumeFromVolumeID(ctx, volumeID, cr, secrets)
|
||||||
|
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) {
|
||||||
|
return vol, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the volume details are not found in the OMAP it can be a mirrored RBD
|
||||||
|
// image and the OMAP is already generated and the volumeHandle might not
|
||||||
|
// be the same in the PV.Spec.CSI.VolumeHandle. Check the PV annotation for
|
||||||
|
// the new volumeHandle. If the new volumeHandle is found, generate the RBD
|
||||||
|
// volume structure from the new volumeHandle.
|
||||||
|
c := util.NewK8sClient()
|
||||||
|
listOpt := metav1.ListOptions{
|
||||||
|
LabelSelector: PVReplicatedLabelKey,
|
||||||
|
}
|
||||||
|
pvlist, pErr := c.CoreV1().PersistentVolumes().List(context.TODO(), listOpt)
|
||||||
|
if pErr != nil {
|
||||||
|
return vol, pErr
|
||||||
|
}
|
||||||
|
for i := range pvlist.Items {
|
||||||
|
if pvlist.Items[i].Spec.CSI != nil && pvlist.Items[i].Spec.CSI.VolumeHandle == volumeID {
|
||||||
|
if v, ok := pvlist.Items[i].Annotations[PVVolumeHandleAnnotationKey]; ok {
|
||||||
|
util.UsefulLog(ctx, "found new volumeID %s for existing volumeID %s", v, volumeID)
|
||||||
|
return generateVolumeFromVolumeID(ctx, v, cr, secrets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return vol, err
|
||||||
|
}
|
||||||
|
|
||||||
func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks bool) (*rbdVolume, error) {
|
func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks bool) (*rbdVolume, error) {
|
||||||
var (
|
var (
|
||||||
ok bool
|
ok bool
|
||||||
|
Loading…
Reference in New Issue
Block a user