cephfs: refactor cephfs core functions

This commits refactors the cephfs core
functions with interfaces. This helps in
better code structuring and writing the
unit test cases.

update #852

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2022-02-15 17:41:09 +05:30
committed by mergify[bot]
parent f19ca4a473
commit e9802c4940
12 changed files with 416 additions and 316 deletions

View File

@ -21,7 +21,6 @@ import (
"errors"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
@ -29,16 +28,16 @@ import (
type cephFSCloneState string
const (
// cephFSCloneError indicates that fetching the clone state returned an error.
cephFSCloneError = cephFSCloneState("")
// cephFSCloneFailed indicates that clone is in failed state.
cephFSCloneFailed = cephFSCloneState("failed")
// cephFSClonePending indicates that clone is in pending state.
cephFSClonePending = cephFSCloneState("pending")
// cephFSCloneInprogress indicates that clone is in in-progress state.
cephFSCloneInprogress = cephFSCloneState("in-progress")
// cephFSCloneComplete indicates that clone is in complete state.
cephFSCloneComplete = cephFSCloneState("complete")
// CephFSCloneError indicates that fetching the clone state returned an error.
CephFSCloneError = cephFSCloneState("")
// CephFSCloneFailed indicates that clone is in failed state.
CephFSCloneFailed = cephFSCloneState("failed")
// CephFSClonePending indicates that clone is in pending state.
CephFSClonePending = cephFSCloneState("pending")
// CephFSCloneInprogress indicates that clone is in in-progress state.
CephFSCloneInprogress = cephFSCloneState("in-progress")
// CephFSCloneComplete indicates that clone is in complete state.
CephFSCloneComplete = cephFSCloneState("complete")
// SnapshotIsProtected string indicates that the snapshot is currently protected.
SnapshotIsProtected = "yes"
@ -47,28 +46,28 @@ const (
// toError checks the state of the clone if it's not cephFSCloneComplete.
func (cs cephFSCloneState) toError() error {
switch cs {
case cephFSCloneComplete:
case CephFSCloneComplete:
return nil
case cephFSCloneError:
case CephFSCloneError:
return cerrors.ErrInvalidClone
case cephFSCloneInprogress:
case CephFSCloneInprogress:
return cerrors.ErrCloneInProgress
case cephFSClonePending:
case CephFSClonePending:
return cerrors.ErrClonePending
case cephFSCloneFailed:
case CephFSCloneFailed:
return cerrors.ErrCloneFailed
}
return nil
}
func CreateCloneFromSubvolume(
// CreateCloneFromSubvolume creates a clone from a subvolume.
func (s *subVolumeClient) CreateCloneFromSubvolume(
ctx context.Context,
volID, cloneID fsutil.VolumeID,
volOpt,
parentvolOpt *VolumeOptions) error {
snapshotID := cloneID
err := parentvolOpt.CreateSnapshot(ctx, snapshotID, volID)
parentvolOpt *SubVolume) error {
snapshotID := s.VolID
snapClient := NewSnapshot(s.conn, snapshotID, parentvolOpt)
err := snapClient.CreateSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err)
@ -82,17 +81,17 @@ func CreateCloneFromSubvolume(
)
defer func() {
if protectErr != nil {
err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID)
err = snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
if cloneErr != nil {
if err = volOpt.PurgeVolume(ctx, cloneID, true); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", cloneID, err)
if err = s.PurgeVolume(ctx, true); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err)
}
if err = parentvolOpt.UnprotectSnapshot(ctx, snapshotID, volID); err != nil {
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
@ -100,47 +99,46 @@ func CreateCloneFromSubvolume(
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
}
}
if err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID); err != nil {
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
}()
protectErr = parentvolOpt.ProtectSnapshot(ctx, snapshotID, volID)
protectErr = snapClient.ProtectSnapshot(ctx)
if protectErr != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr)
return protectErr
}
cloneErr = parentvolOpt.cloneSnapshot(ctx, volID, snapshotID, cloneID, volOpt)
cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", volID, snapshotID, cloneID, cloneErr)
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr)
return cloneErr
}
cloneState, cloneErr := volOpt.getCloneState(ctx, cloneID)
cloneState, cloneErr := s.GetCloneState(ctx)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to get clone state: %v", cloneErr)
return cloneErr
}
if cloneState != cephFSCloneComplete {
log.ErrorLog(ctx, "clone %s did not complete: %v", cloneID, cloneState.toError())
if cloneState != CephFSCloneComplete {
log.ErrorLog(ctx, "clone %s did not complete: %v", s.VolID, cloneState.toError())
return cloneState.toError()
}
err = volOpt.ExpandVolume(ctx, cloneID, volOpt.Size)
err = s.ExpandVolume(ctx, s.Size)
if err != nil {
log.ErrorLog(ctx, "failed to expand volume %s: %v", cloneID, err)
log.ErrorLog(ctx, "failed to expand volume %s: %v", s.VolID, err)
return err
}
// As we completed clone, remove the intermediate snap
if err = parentvolOpt.UnprotectSnapshot(ctx, snapshotID, volID); err != nil {
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
@ -150,7 +148,7 @@ func CreateCloneFromSubvolume(
return err
}
}
if err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID); err != nil {
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
return err
@ -159,14 +157,14 @@ func CreateCloneFromSubvolume(
return nil
}
func cleanupCloneFromSubvolumeSnapshot(
ctx context.Context,
volID, cloneID fsutil.VolumeID,
parentVolOpt *VolumeOptions) error {
// CleanupSnapshotFromSubvolume removes the snapshot from the subvolume.
func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
ctx context.Context, parentVol *SubVolume) error {
// snapshot name is same as clone name as we need a name which can be
// identified during PVC-PVC cloning.
snapShotID := cloneID
snapInfo, err := parentVolOpt.GetSnapshotInfo(ctx, snapShotID, volID)
snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil
@ -176,14 +174,14 @@ func cleanupCloneFromSubvolumeSnapshot(
}
if snapInfo.Protected == SnapshotIsProtected {
err = parentVolOpt.UnprotectSnapshot(ctx, snapShotID, volID)
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err)
return err
}
}
err = parentVolOpt.DeleteSnapshot(ctx, snapShotID, volID)
err = snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapShotID, err)
@ -193,45 +191,39 @@ func cleanupCloneFromSubvolumeSnapshot(
return nil
}
func CreateCloneFromSnapshot(
ctx context.Context,
parentVolOpt, volOptions *VolumeOptions,
vID *VolumeIdentifier,
sID *SnapshotIdentifier) error {
snapID := fsutil.VolumeID(sID.FsSnapshotName)
err := parentVolOpt.cloneSnapshot(
ctx,
fsutil.VolumeID(sID.FsSubvolName),
snapID,
fsutil.VolumeID(vID.FsSubvolName),
volOptions)
// CreateSnapshotFromSubvolume creates a clone from subvolume snapshot.
func (s *subVolumeClient) CreateCloneFromSnapshot(
ctx context.Context, snap Snapshot) error {
snapID := snap.SnapshotID
snapClient := NewSnapshot(s.conn, snapID, snap.SubVolume)
err := snapClient.CloneSnapshot(ctx, s.SubVolume)
if err != nil {
return err
}
defer func() {
if err != nil {
if !cerrors.IsCloneRetryError(err) {
if dErr := volOptions.PurgeVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), true); dErr != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, dErr)
if dErr := s.PurgeVolume(ctx, true); dErr != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, dErr)
}
}
}
}()
cloneState, err := volOptions.getCloneState(ctx, fsutil.VolumeID(vID.FsSubvolName))
cloneState, err := s.GetCloneState(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to get clone state: %v", err)
return err
}
if cloneState != cephFSCloneComplete {
if cloneState != CephFSCloneComplete {
return cloneState.toError()
}
err = volOptions.ExpandVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), volOptions.Size)
err = s.ExpandVolume(ctx, s.Size)
if err != nil {
log.ErrorLog(ctx, "failed to expand volume %s with error: %v", vID.FsSubvolName, err)
log.ErrorLog(ctx, "failed to expand volume %s with error: %v", s.VolID, err)
return err
}
@ -239,24 +231,25 @@ func CreateCloneFromSnapshot(
return nil
}
func (vo *VolumeOptions) getCloneState(ctx context.Context, volID fsutil.VolumeID) (cephFSCloneState, error) {
fsa, err := vo.conn.GetFSAdmin()
// GetCloneState returns the clone state of the subvolume.
func (s *subVolumeClient) GetCloneState(ctx context.Context) (cephFSCloneState, error) {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(
ctx,
"could not get FSAdmin, can get clone status for volume %s with ID %s: %v",
vo.FsName,
string(volID),
s.FsName,
s.VolID,
err)
return cephFSCloneError, err
return CephFSCloneError, err
}
cs, err := fsa.CloneStatus(vo.FsName, vo.SubvolumeGroup, string(volID))
cs, err := fsa.CloneStatus(s.FsName, s.SubvolumeGroup, s.VolID)
if err != nil {
log.ErrorLog(ctx, "could not get clone state for volume %s with ID %s: %v", vo.FsName, string(volID), err)
log.ErrorLog(ctx, "could not get clone state for volume %s with ID %s: %v", s.FsName, s.VolID, err)
return cephFSCloneError, err
return CephFSCloneError, err
}
return cephFSCloneState(cs.State), nil

View File

@ -27,11 +27,11 @@ import (
func TestCloneStateToError(t *testing.T) {
t.Parallel()
errorState := make(map[cephFSCloneState]error)
errorState[cephFSCloneComplete] = nil
errorState[cephFSCloneError] = cerrors.ErrInvalidClone
errorState[cephFSCloneInprogress] = cerrors.ErrCloneInProgress
errorState[cephFSClonePending] = cerrors.ErrClonePending
errorState[cephFSCloneFailed] = cerrors.ErrCloneFailed
errorState[CephFSCloneComplete] = nil
errorState[CephFSCloneError] = cerrors.ErrInvalidClone
errorState[CephFSCloneInprogress] = cerrors.ErrCloneInProgress
errorState[CephFSClonePending] = cerrors.ErrClonePending
errorState[CephFSCloneFailed] = cerrors.ErrCloneFailed
for state, err := range errorState {
assert.Equal(t, state.toError(), err)

View File

@ -1,441 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
"fmt"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/golang/protobuf/ptypes/timestamp"
)
var (
// VolJournal is used to maintain RADOS based journals for CO generated.
// VolumeName to backing CephFS subvolumes.
VolJournal *journal.Config
// SnapJournal is used to maintain RADOS based journals for CO generated.
// SnapshotName to backing CephFS subvolumes.
SnapJournal *journal.Config
)
// VolumeIdentifier structure contains an association between the CSI VolumeID to its subvolume
// name on the backing CephFS instance.
type VolumeIdentifier struct {
FsSubvolName string
VolumeID string
}
type SnapshotIdentifier struct {
FsSnapshotName string
SnapshotID string
RequestName string
CreationTime *timestamp.Timestamp
FsSubvolName string
}
/*
CheckVolExists checks to determine if passed in RequestName in volOptions exists on the backend.
**NOTE:** These functions manipulate the rados omaps that hold information regarding
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
respective CSI driver generated volume name based locks are held, as otherwise racy
access to these omaps may end up leaving them in an inconsistent state.
These functions also cleanup omap reservations that are stale. I.e when omap entries exist and
backing subvolumes are missing, or one of the omaps exist and the next is missing. This is
because, the order of omap creation and deletion are inverse of each other, and protected by the
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
hence safe to garbage collect.
*/
// nolint:gocognit,gocyclo,nestif,cyclop // TODO: reduce complexity
func CheckVolExists(ctx context.Context,
volOptions,
parentVolOpt *VolumeOptions,
pvID *VolumeIdentifier,
sID *SnapshotIdentifier,
cr *util.Credentials) (*VolumeIdentifier, error) {
var vid VolumeIdentifier
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
imageData, err := j.CheckReservation(
ctx, volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "")
if err != nil {
return nil, err
}
if imageData == nil {
return nil, nil
}
imageUUID := imageData.ImageUUID
vid.FsSubvolName = imageData.ImageAttributes.ImageName
if sID != nil || pvID != nil {
cloneState, cloneStateErr := volOptions.getCloneState(ctx, fsutil.VolumeID(vid.FsSubvolName))
if cloneStateErr != nil {
if errors.Is(cloneStateErr, cerrors.ErrVolumeNotFound) {
if pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx, fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
}
if cloneState == cephFSCloneInprogress {
return nil, cerrors.ErrCloneInProgress
}
if cloneState == cephFSClonePending {
return nil, cerrors.ErrClonePending
}
if cloneState == cephFSCloneFailed {
log.ErrorLog(ctx,
"clone failed, deleting subvolume clone. vol=%s, subvol=%s subvolgroup=%s",
volOptions.FsName,
vid.FsSubvolName,
volOptions.SubvolumeGroup)
err = volOptions.PurgeVolume(ctx, fsutil.VolumeID(vid.FsSubvolName), true)
if err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", vid.FsSubvolName, err)
return nil, err
}
if pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx, fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
if cloneState != cephFSCloneComplete {
return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName)
}
}
volOptions.RootPath, err = volOptions.GetVolumeRootPathCeph(ctx, fsutil.VolumeID(vid.FsSubvolName))
if err != nil {
if errors.Is(err, cerrors.ErrVolumeNotFound) {
// If the subvolume is not present, cleanup the stale snapshot
// created for clone.
if parentVolOpt != nil && pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx,
fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
}
// check if topology constraints match what is found
// TODO: we need an API to fetch subvolume attributes (size/datapool and others), based
// on which we can evaluate which topology this belongs to.
// TODO: CephFS topology support is postponed till we get the same
// TODO: size checks
// found a volume already available, process and return it!
vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, fsutil.VolIDVersion)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Found existing volume (%s) with subvolume name (%s) for request (%s)",
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
if parentVolOpt != nil && pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx,
fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
return &vid, nil
}
// UndoVolReservation is a helper routine to undo a name reservation for a CSI VolumeName.
func UndoVolReservation(
ctx context.Context,
volOptions *VolumeOptions,
vid VolumeIdentifier,
secret map[string]string) error {
cr, err := util.NewAdminCredentials(secret)
if err != nil {
return err
}
defer cr.DeleteCredentials()
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return err
}
defer j.Destroy()
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return err
}
func updateTopologyConstraints(volOpts *VolumeOptions) error {
// update request based on topology constrained parameters (if present)
poolName, _, topology, err := util.FindPoolAndTopology(volOpts.TopologyPools, volOpts.TopologyRequirement)
if err != nil {
return err
}
if poolName != "" {
volOpts.Pool = poolName
volOpts.Topology = topology
}
return nil
}
// ReserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and,
// to generate the volume identifier for the reserved UUID.
func ReserveVol(ctx context.Context, volOptions *VolumeOptions, secret map[string]string) (*VolumeIdentifier, error) {
var (
vid VolumeIdentifier
imageUUID string
err error
)
cr, err := util.NewAdminCredentials(secret)
if err != nil {
return nil, err
}
defer cr.DeleteCredentials()
err = updateTopologyConstraints(volOptions)
if err != nil {
return nil, err
}
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
imageUUID, vid.FsSubvolName, err = j.ReserveName(
ctx, volOptions.MetadataPool, util.InvalidPoolID,
volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName,
volOptions.NamePrefix, "", "", volOptions.ReservedID, "")
if err != nil {
return nil, err
}
// generate the volume ID to return to the CO system
vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, fsutil.VolIDVersion)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Generated Volume ID (%s) and subvolume name (%s) for request name (%s)",
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
return &vid, nil
}
// ReserveSnap is a helper routine to request a UUID reservation for the CSI SnapName and,
// to generate the snapshot identifier for the reserved UUID.
func ReserveSnap(
ctx context.Context,
volOptions *VolumeOptions,
parentSubVolName string,
snap *CephfsSnapshot,
cr *util.Credentials) (*SnapshotIdentifier, error) {
var (
vid SnapshotIdentifier
imageUUID string
err error
)
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
imageUUID, vid.FsSnapshotName, err = j.ReserveName(
ctx, volOptions.MetadataPool, util.InvalidPoolID,
volOptions.MetadataPool, util.InvalidPoolID, snap.RequestName,
snap.NamePrefix, parentSubVolName, "", snap.ReservedID, "")
if err != nil {
return nil, err
}
// generate the snapshot ID to return to the CO system
vid.SnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, fsutil.VolIDVersion)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Generated Snapshot ID (%s) for request name (%s)",
vid.SnapshotID, snap.RequestName)
return &vid, nil
}
// UndoSnapReservation is a helper routine to undo a name reservation for a CSI SnapshotName.
func UndoSnapReservation(
ctx context.Context,
volOptions *VolumeOptions,
vid SnapshotIdentifier,
snapName string,
cr *util.Credentials) error {
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return err
}
defer j.Destroy()
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSnapshotName, snapName)
return err
}
/*
CheckSnapExists checks to determine if passed in RequestName in volOptions exists on the backend.
**NOTE:** These functions manipulate the rados omaps that hold information regarding
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
respective CSI driver generated volume name based locks are held, as otherwise racy
access to these omaps may end up leaving them in an inconsistent state.
These functions also cleanup omap reservations that are stale. I.e. when omap entries exist and
backing subvolumes are missing, or one of the omaps exist and the next is missing. This is
because, the order of omap creation and deletion are inverse of each other, and protected by the
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
hence safe to garbage collect.
*/
func CheckSnapExists(
ctx context.Context,
volOptions *VolumeOptions,
parentSubVolName string,
snap *CephfsSnapshot,
cr *util.Credentials) (*SnapshotIdentifier, *SnapshotInfo, error) {
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, nil, err
}
defer j.Destroy()
snapData, err := j.CheckReservation(
ctx, volOptions.MetadataPool, snap.RequestName, snap.NamePrefix, parentSubVolName, "")
if err != nil {
return nil, nil, err
}
if snapData == nil {
return nil, nil, nil
}
sid := &SnapshotIdentifier{}
snapUUID := snapData.ImageUUID
snapID := snapData.ImageAttributes.ImageName
sid.FsSnapshotName = snapData.ImageAttributes.ImageName
snapInfo, err := volOptions.GetSnapshotInfo(ctx, fsutil.VolumeID(snapID), fsutil.VolumeID(parentSubVolName))
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, snapID, snap.RequestName)
return nil, nil, err
}
return nil, nil, err
}
defer func() {
if err != nil {
err = volOptions.DeleteSnapshot(ctx, fsutil.VolumeID(snapID), fsutil.VolumeID(parentSubVolName))
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s: %v", snapID, err)
return
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, snapID, snap.RequestName)
if err != nil {
log.ErrorLog(ctx, "removing reservation failed for snapshot %s: %v", snapID, err)
}
}
}()
tm, err := fsutil.ParseTime(ctx, snapInfo.CreatedAt)
if err != nil {
return nil, nil, err
}
sid.CreationTime = tm
// found a snapshot already available, process and return it!
sid.SnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, snapUUID, fsutil.VolIDVersion)
if err != nil {
return nil, nil, err
}
log.DebugLog(ctx, "Found existing snapshot (%s) with subvolume name (%s) for request (%s)",
snapData.ImageAttributes.RequestName, parentSubVolName, sid.FsSnapshotName)
return sid, &snapInfo, nil
}

View File

@ -22,7 +22,7 @@ import (
"time"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/ceph/go-ceph/cephfs/admin"
@ -36,32 +36,59 @@ const (
autoProtect = "snapshot-autoprotect"
)
// CephfsSnapshot represents a CSI snapshot and its cluster information.
type CephfsSnapshot struct {
NamePrefix string
Monitors string
// MetadataPool & Pool fields are not used atm. But its definitely good to have it in this struct
// so keeping it here
MetadataPool string
Pool string
ClusterID string
RequestName string
// ReservedID represents the ID reserved for a snapshot
ReservedID string
// SnapshotClient is the interface that holds the signature of snapshot methods
// that interacts with CephFS snapshot API's.
type SnapshotClient interface {
// CreateSnapshot creates a snapshot of the subvolume.
CreateSnapshot(ctx context.Context) error
// DeleteSnapshot deletes the snapshot of the subvolume.
DeleteSnapshot(ctx context.Context) error
// GetSnapshotInfo returns the snapshot info of the subvolume.
GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error)
// ProtectSnapshot protects the snapshot of the subvolume.
ProtectSnapshot(ctx context.Context) error
// UnprotectSnapshot unprotects the snapshot of the subvolume.
UnprotectSnapshot(ctx context.Context) error
// CloneSnapshot clones the snapshot of the subvolume.
CloneSnapshot(ctx context.Context, cloneVolOptions *SubVolume) error
}
func (vo *VolumeOptions) CreateSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
fsa, err := vo.conn.GetFSAdmin()
// snapshotClient is the implementation of SnapshotClient interface.
type snapshotClient struct {
*Snapshot // Embedded snapshot struct.
conn *util.ClusterConnection // Cluster connection.
}
// Snapshot represents a subvolume snapshot and its cluster information.
type Snapshot struct {
SnapshotID string // subvolume snapshot id.
*SubVolume // parent subvolume information.
}
// NewSnapshot creates a new snapshot client.
func NewSnapshot(conn *util.ClusterConnection, snapshotID string, vol *SubVolume) SnapshotClient {
return &snapshotClient{
Snapshot: &Snapshot{
SnapshotID: snapshotID,
SubVolume: vol,
},
conn: conn,
}
}
// CreateSnapshot creates a snapshot of the subvolume.
func (s *snapshotClient) CreateSnapshot(ctx context.Context) error {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.CreateSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID))
err = fsa.CreateSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
log.ErrorLog(ctx, "failed to create subvolume snapshot %s %s in fs %s: %s",
string(snapID), string(volID), vo.FsName, err)
s.SnapshotID, s.VolID, s.FsName, err)
return err
}
@ -69,18 +96,19 @@ func (vo *VolumeOptions) CreateSnapshot(ctx context.Context, snapID, volID fsuti
return nil
}
func (vo *VolumeOptions) DeleteSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
fsa, err := vo.conn.GetFSAdmin()
// DeleteSnapshot deletes the snapshot of the subvolume.
func (s *snapshotClient) DeleteSnapshot(ctx context.Context) error {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.ForceRemoveSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID))
err = fsa.ForceRemoveSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
log.ErrorLog(ctx, "failed to delete subvolume snapshot %s %s in fs %s: %s",
string(snapID), string(volID), vo.FsName, err)
s.SnapshotID, s.VolID, s.FsName, err)
return err
}
@ -95,16 +123,17 @@ type SnapshotInfo struct {
Protected string
}
func (vo *VolumeOptions) GetSnapshotInfo(ctx context.Context, snapID, volID fsutil.VolumeID) (SnapshotInfo, error) {
// GetSnapshotInfo returns the snapshot info of the subvolume.
func (s *snapshotClient) GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error) {
snap := SnapshotInfo{}
fsa, err := vo.conn.GetFSAdmin()
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return snap, err
}
info, err := fsa.SubVolumeSnapshotInfo(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID))
info, err := fsa.SubVolumeSnapshotInfo(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
return snap, cerrors.ErrSnapNotFound
@ -112,9 +141,9 @@ func (vo *VolumeOptions) GetSnapshotInfo(ctx context.Context, snapID, volID fsut
log.ErrorLog(
ctx,
"failed to get subvolume snapshot info %s %s in fs %s with error %s",
string(volID),
string(snapID),
vo.FsName,
s.VolID,
s.SnapshotID,
s.FsName,
err)
return snap, err
@ -126,21 +155,21 @@ func (vo *VolumeOptions) GetSnapshotInfo(ctx context.Context, snapID, volID fsut
return snap, nil
}
func (vo *VolumeOptions) ProtectSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
// ProtectSnapshot protects the snapshot of the subvolume.
func (s *snapshotClient) ProtectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The ProtectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, vo.Features) {
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := vo.conn.GetFSAdmin()
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.ProtectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID),
string(snapID))
err = fsa.ProtectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
if errors.Is(err, rados.ErrObjectExists) {
return nil
@ -148,9 +177,9 @@ func (vo *VolumeOptions) ProtectSnapshot(ctx context.Context, snapID, volID fsut
log.ErrorLog(
ctx,
"failed to protect subvolume snapshot %s %s in fs %s with error: %s",
string(volID),
string(snapID),
vo.FsName,
s.VolID,
s.SnapshotID,
s.FsName,
err)
return err
@ -159,21 +188,22 @@ func (vo *VolumeOptions) ProtectSnapshot(ctx context.Context, snapID, volID fsut
return nil
}
func (vo *VolumeOptions) UnprotectSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
// UnprotectSnapshot unprotects the snapshot of the subvolume.
func (s *snapshotClient) UnprotectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The UnprotectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, vo.Features) {
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := vo.conn.GetFSAdmin()
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.UnprotectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID),
string(snapID))
err = fsa.UnprotectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID,
s.SnapshotID)
if err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error.
@ -183,9 +213,9 @@ func (vo *VolumeOptions) UnprotectSnapshot(ctx context.Context, snapID, volID fs
log.ErrorLog(
ctx,
"failed to unprotect subvolume snapshot %s %s in fs %s with error: %s",
string(volID),
string(snapID),
vo.FsName,
s.VolID,
s.SnapshotID,
s.FsName,
err)
return err
@ -194,33 +224,33 @@ func (vo *VolumeOptions) UnprotectSnapshot(ctx context.Context, snapID, volID fs
return nil
}
func (vo *VolumeOptions) cloneSnapshot(
// CloneSnapshot clones the snapshot of the subvolume.
func (s *snapshotClient) CloneSnapshot(
ctx context.Context,
volID, snapID, cloneID fsutil.VolumeID,
cloneVolOptions *VolumeOptions,
cloneSubVol *SubVolume,
) error {
fsa, err := vo.conn.GetFSAdmin()
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
co := &admin.CloneOptions{
TargetGroup: cloneVolOptions.SubvolumeGroup,
TargetGroup: cloneSubVol.SubvolumeGroup,
}
if cloneVolOptions.Pool != "" {
co.PoolLayout = cloneVolOptions.Pool
if cloneSubVol.Pool != "" {
co.PoolLayout = cloneSubVol.Pool
}
err = fsa.CloneSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID), string(cloneID), co)
err = fsa.CloneSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, cloneSubVol.VolID, co)
if err != nil {
log.ErrorLog(
ctx,
"failed to clone subvolume snapshot %s %s in fs %s with error: %s",
string(volID),
string(snapID),
string(cloneID),
vo.FsName,
s.VolID,
s.SnapshotID,
cloneSubVol.VolID,
s.FsName,
err)
if errors.Is(err, rados.ErrNotFound) {
return cerrors.ErrVolumeNotFound

View File

@ -53,20 +53,75 @@ type Subvolume struct {
Features []string
}
// SubVolumeClient is the interface that holds the signature of subvolume methods
// that interacts with CephFS subvolume API's.
type SubVolumeClient interface {
// GetVolumeRootPathCeph returns the root path of the subvolume.
GetVolumeRootPathCeph(ctx context.Context) (string, error)
// CreateVolume creates a subvolume.
CreateVolume(ctx context.Context) error
// GetSubVolumeInfo returns the subvolume information.
GetSubVolumeInfo(ctx context.Context) (*Subvolume, error)
// ExpandVolume expands the volume if the requested size is greater than
// the subvolume size.
ExpandVolume(ctx context.Context, bytesQuota int64) error
// ResizeVolume resizes the volume.
ResizeVolume(ctx context.Context, bytesQuota int64) error
// PurgSubVolume removes the subvolume.
PurgeVolume(ctx context.Context, force bool) error
// CreateCloneFromSubVolume creates a clone from the subvolume.
CreateCloneFromSubvolume(ctx context.Context, parentvolOpt *SubVolume) error
// GetCloneState returns the clone state of the subvolume.
GetCloneState(ctx context.Context) (cephFSCloneState, error)
// CreateCloneFromSnapshot creates a clone from the subvolume snapshot.
CreateCloneFromSnapshot(ctx context.Context, snap Snapshot) error
// CleanupSnapshotFromSubvolume removes the snapshot from the subvolume.
CleanupSnapshotFromSubvolume(ctx context.Context, parentVol *SubVolume) error
}
// subVolumeClient implements SubVolumeClient interface.
type subVolumeClient struct {
*SubVolume // Embedded SubVolume struct.
clusterID string // Cluster ID to check subvolumegroup and resize functionality.
conn *util.ClusterConnection // Cluster connection.
}
// SubVolume holds the information about the subvolume.
type SubVolume struct {
VolID string // subvolume id.
FsName string // filesystem name.
SubvolumeGroup string // subvolume group name where subvolume will be created.
Pool string // pool name where subvolume will be created.
Features []string // subvolume features.
Size int64 // subvolume size.
}
// NewSubVolume returns a new subvolume client.
func NewSubVolume(conn *util.ClusterConnection, vol *SubVolume, clusterID string) SubVolumeClient {
return &subVolumeClient{
SubVolume: vol,
clusterID: clusterID,
conn: conn,
}
}
// GetVolumeRootPathCephDeprecated returns the root path of the subvolume.
func GetVolumeRootPathCephDeprecated(volID fsutil.VolumeID) string {
return path.Join("/", "csi-volumes", string(volID))
}
func (vo *VolumeOptions) GetVolumeRootPathCeph(ctx context.Context, volID fsutil.VolumeID) (string, error) {
fsa, err := vo.conn.GetFSAdmin()
// GetVolumeRootPathCeph returns the root path of the subvolume.
func (s *subVolumeClient) GetVolumeRootPathCeph(ctx context.Context) (string, error) {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin err %s", err)
return "", err
}
svPath, err := fsa.SubVolumePath(vo.FsName, vo.SubvolumeGroup, string(volID))
svPath, err := fsa.SubVolumePath(s.FsName, s.SubvolumeGroup, s.VolID)
if err != nil {
log.ErrorLog(ctx, "failed to get the rootpath for the vol %s: %s", string(volID), err)
log.ErrorLog(ctx, "failed to get the rootpath for the vol %s: %s", s.VolID, err)
if errors.Is(err, rados.ErrNotFound) {
return "", util.JoinErrors(cerrors.ErrVolumeNotFound, err)
}
@ -77,17 +132,18 @@ func (vo *VolumeOptions) GetVolumeRootPathCeph(ctx context.Context, volID fsutil
return svPath, nil
}
func (vo *VolumeOptions) GetSubVolumeInfo(ctx context.Context, volID fsutil.VolumeID) (*Subvolume, error) {
fsa, err := vo.conn.GetFSAdmin()
// GetSubVolumeInfo returns the subvolume information.
func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, error) {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not fetch metadata pool for %s:", vo.FsName, err)
log.ErrorLog(ctx, "could not get FSAdmin, can not fetch metadata pool for %s:", s.FsName, err)
return nil, err
}
info, err := fsa.SubVolumeInfo(vo.FsName, vo.SubvolumeGroup, string(volID))
info, err := fsa.SubVolumeInfo(s.FsName, s.SubvolumeGroup, s.VolID)
if err != nil {
log.ErrorLog(ctx, "failed to get subvolume info for the vol %s: %s", string(volID), err)
log.ErrorLog(ctx, "failed to get subvolume info for the vol %s: %s", s.VolID, err)
if errors.Is(err, rados.ErrNotFound) {
return nil, cerrors.ErrVolumeNotFound
}
@ -111,7 +167,7 @@ func (vo *VolumeOptions) GetSubVolumeInfo(ctx context.Context, volID fsutil.Volu
// or nil (in case the subvolume is in snapshot-retained state),
// just continue without returning quota information.
if !(info.BytesQuota == fsAdmin.Infinite || info.State == fsAdmin.StateSnapRetained) {
return nil, fmt.Errorf("subvolume %s has unsupported quota: %v", string(volID), info.BytesQuota)
return nil, fmt.Errorf("subvolume %s has unsupported quota: %v", s.VolID, info.BytesQuota)
}
} else {
subvol.BytesQuota = int64(bc)
@ -140,50 +196,52 @@ type localClusterState struct {
subVolumeGroupCreated bool
}
func CreateVolume(ctx context.Context, volOptions *VolumeOptions, volID fsutil.VolumeID, bytesQuota int64) error {
// verify if corresponding ClusterID key is present in the map,
// CreateVolume creates a subvolume.
func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
// verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent := clusterAdditionalInfo[volOptions.ClusterID]; !keyPresent {
clusterAdditionalInfo[volOptions.ClusterID] = &localClusterState{}
if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent {
clusterAdditionalInfo[s.clusterID] = &localClusterState{}
}
ca, err := volOptions.conn.GetFSAdmin()
ca, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not create subvolume %s: %s", string(volID), err)
log.ErrorLog(ctx, "could not get FSAdmin, can not create subvolume %s: %s", s.VolID, err)
return err
}
// create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated {
if !clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated {
opts := fsAdmin.SubVolumeGroupOptions{}
err = ca.CreateSubVolumeGroup(volOptions.FsName, volOptions.SubvolumeGroup, &opts)
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
if err != nil {
log.ErrorLog(
ctx,
"failed to create subvolume group %s, for the vol %s: %s",
volOptions.SubvolumeGroup,
string(volID),
s.SubvolumeGroup,
s.VolID,
err)
return err
}
log.DebugLog(ctx, "cephfs: created subvolume group %s", volOptions.SubvolumeGroup)
clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated = true
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated = true
}
opts := fsAdmin.SubVolumeOptions{
Size: fsAdmin.ByteCount(bytesQuota),
Size: fsAdmin.ByteCount(s.Size),
Mode: modeAllRWX,
}
if volOptions.Pool != "" {
opts.PoolLayout = volOptions.Pool
if s.Pool != "" {
opts.PoolLayout = s.Pool
}
fmt.Println("this is for debugging ")
// FIXME: check if the right credentials are used ("-n", cephEntityClientPrefix + cr.ID)
err = ca.CreateSubVolume(volOptions.FsName, volOptions.SubvolumeGroup, string(volID), &opts)
err = ca.CreateSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, &opts)
if err != nil {
log.ErrorLog(ctx, "failed to create subvolume %s in fs %s: %s", string(volID), volOptions.FsName, err)
log.ErrorLog(ctx, "failed to create subvolume %s in fs %s: %s", s.VolID, s.FsName, err)
return err
}
@ -193,16 +251,16 @@ func CreateVolume(ctx context.Context, volOptions *VolumeOptions, volID fsutil.V
// ExpandVolume will expand the volume if the requested size is greater than
// the subvolume size.
func (vo *VolumeOptions) ExpandVolume(ctx context.Context, volID fsutil.VolumeID, bytesQuota int64) error {
func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) error {
// get the subvolume size for comparison with the requested size.
info, err := vo.GetSubVolumeInfo(ctx, volID)
info, err := s.GetSubVolumeInfo(ctx)
if err != nil {
return err
}
// resize if the requested size is greater than the current size.
if vo.Size > info.BytesQuota {
log.DebugLog(ctx, "clone %s size %d is greater than requested size %d", volID, info.BytesQuota, bytesQuota)
err = vo.ResizeVolume(ctx, volID, bytesQuota)
if s.Size > info.BytesQuota {
log.DebugLog(ctx, "clone %s size %d is greater than requested size %d", s.VolID, info.BytesQuota, bytesQuota)
err = s.ResizeVolume(ctx, bytesQuota)
}
return err
@ -211,45 +269,47 @@ func (vo *VolumeOptions) ExpandVolume(ctx context.Context, volID fsutil.VolumeID
// ResizeVolume will try to use ceph fs subvolume resize command to resize the
// subvolume. If the command is not available as a fallback it will use
// CreateVolume to resize the subvolume.
func (vo *VolumeOptions) ResizeVolume(ctx context.Context, volID fsutil.VolumeID, bytesQuota int64) error {
func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error {
// keyPresent checks whether corresponding clusterID key is present in clusterAdditionalInfo
var keyPresent bool
// verify if corresponding ClusterID key is present in the map,
// verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent = clusterAdditionalInfo[vo.ClusterID]; !keyPresent {
clusterAdditionalInfo[vo.ClusterID] = &localClusterState{}
if _, keyPresent = clusterAdditionalInfo[s.clusterID]; !keyPresent {
clusterAdditionalInfo[s.clusterID] = &localClusterState{}
}
// resize subvolume when either it's supported, or when corresponding
// clusterID key was not present.
if clusterAdditionalInfo[vo.ClusterID].resizeState == unknown ||
clusterAdditionalInfo[vo.ClusterID].resizeState == supported {
fsa, err := vo.conn.GetFSAdmin()
if clusterAdditionalInfo[s.clusterID].resizeState == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState == supported {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", vo.FsName, err)
log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err)
return err
}
_, err = fsa.ResizeSubVolume(vo.FsName, vo.SubvolumeGroup, string(volID), fsAdmin.ByteCount(bytesQuota), true)
_, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true)
if err == nil {
clusterAdditionalInfo[vo.ClusterID].resizeState = supported
clusterAdditionalInfo[s.clusterID].resizeState = supported
return nil
}
var invalid fsAdmin.NotImplementedError
// In case the error is other than invalid command return error to the caller.
if !errors.As(err, &invalid) {
log.ErrorLog(ctx, "failed to resize subvolume %s in fs %s: %s", string(volID), vo.FsName, err)
log.ErrorLog(ctx, "failed to resize subvolume %s in fs %s: %s", s.VolID, s.FsName, err)
return err
}
}
clusterAdditionalInfo[vo.ClusterID].resizeState = unsupported
clusterAdditionalInfo[s.clusterID].resizeState = unsupported
s.Size = bytesQuota
return CreateVolume(ctx, vo, volID, bytesQuota)
return s.CreateVolume(ctx)
}
func (vo *VolumeOptions) PurgeVolume(ctx context.Context, volID fsutil.VolumeID, force bool) error {
fsa, err := vo.conn.GetFSAdmin()
// PurgSubVolume removes the subvolume.
func (s *subVolumeClient) PurgeVolume(ctx context.Context, force bool) error {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin %s:", err)
@ -259,13 +319,13 @@ func (vo *VolumeOptions) PurgeVolume(ctx context.Context, volID fsutil.VolumeID,
opt := fsAdmin.SubVolRmFlags{}
opt.Force = force
if checkSubvolumeHasFeature("snapshot-retention", vo.Features) {
if checkSubvolumeHasFeature("snapshot-retention", s.Features) {
opt.RetainSnapshots = true
}
err = fsa.RemoveSubVolumeWithFlags(vo.FsName, vo.SubvolumeGroup, string(volID), opt)
err = fsa.RemoveSubVolumeWithFlags(s.FsName, s.SubvolumeGroup, s.VolID, opt)
if err != nil {
log.ErrorLog(ctx, "failed to purge subvolume %s in fs %s: %s", string(volID), vo.FsName, err)
log.ErrorLog(ctx, "failed to purge subvolume %s in fs %s: %s", s.VolID, s.FsName, err)
if strings.Contains(err.Error(), cerrors.VolumeNotEmpty) {
return util.JoinErrors(cerrors.ErrVolumeHasSnapshots, err)
}

View File

@ -1,617 +0,0 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
type VolumeOptions struct {
TopologyPools *[]util.TopologyConstrainedPool
TopologyRequirement *csi.TopologyRequirement
Topology map[string]string
RequestName string
NamePrefix string
Size int64
ClusterID string
FsName string
FscID int64
// ReservedID represents the ID reserved for a subvolume
ReservedID string
MetadataPool string
Monitors string `json:"monitors"`
Pool string `json:"pool"`
RootPath string `json:"rootPath"`
Mounter string `json:"mounter"`
ProvisionVolume bool `json:"provisionVolume"`
KernelMountOptions string `json:"kernelMountOptions"`
FuseMountOptions string `json:"fuseMountOptions"`
SubvolumeGroup string
Features []string
// conn is a connection to the Ceph cluster obtained from a ConnPool
conn *util.ClusterConnection
}
// Connect a CephFS volume to the Ceph cluster.
func (vo *VolumeOptions) Connect(cr *util.Credentials) error {
if vo.conn != nil {
return nil
}
conn := &util.ClusterConnection{}
if err := conn.Connect(vo.Monitors, cr); err != nil {
return err
}
vo.conn = conn
return nil
}
// Destroy cleans up the CephFS volume object and closes the connection to the
// Ceph cluster in case one was setup.
func (vo *VolumeOptions) Destroy() {
if vo.conn != nil {
vo.conn.Destroy()
}
}
func validateNonEmptyField(field, fieldName string) error {
if field == "" {
return fmt.Errorf("parameter '%s' cannot be empty", fieldName)
}
return nil
}
func extractOptionalOption(dest *string, optionLabel string, options map[string]string) error {
opt, ok := options[optionLabel]
if !ok {
// Option not found, no error as it is optional
return nil
}
if err := validateNonEmptyField(opt, optionLabel); err != nil {
return err
}
*dest = opt
return nil
}
func extractOption(dest *string, optionLabel string, options map[string]string) error {
opt, ok := options[optionLabel]
if !ok {
return fmt.Errorf("missing required field %s", optionLabel)
}
if err := validateNonEmptyField(opt, optionLabel); err != nil {
return err
}
*dest = opt
return nil
}
func validateMounter(m string) error {
switch m {
case "fuse":
case "kernel":
default:
return fmt.Errorf("unknown mounter '%s'. Valid options are 'fuse' and 'kernel'", m)
}
return nil
}
func extractMounter(dest *string, options map[string]string) error {
if err := extractOptionalOption(dest, "mounter", options); err != nil {
return err
}
if *dest != "" {
if err := validateMounter(*dest); err != nil {
return err
}
}
return nil
}
func GetClusterInformation(options map[string]string) (*util.ClusterInfo, error) {
clusterID, ok := options["clusterID"]
if !ok {
err := fmt.Errorf("clusterID must be set")
return nil, err
}
if err := validateNonEmptyField(clusterID, "clusterID"); err != nil {
return nil, err
}
monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s): %w", clusterID, err)
return nil, err
}
subvolumeGroup, err := util.CephFSSubvolumeGroup(util.CsiConfigFile, clusterID)
if err != nil {
err = fmt.Errorf("failed to fetch subvolumegroup using clusterID (%s): %w", clusterID, err)
return nil, err
}
clusterData := &util.ClusterInfo{
ClusterID: clusterID,
Monitors: strings.Split(monitors, ","),
}
clusterData.CephFS.SubvolumeGroup = subvolumeGroup
return clusterData, nil
}
// NewVolumeOptions generates a new instance of volumeOptions from the provided
// CSI request parameters.
func NewVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVolumeRequest,
cr *util.Credentials) (*VolumeOptions, error) {
var (
opts VolumeOptions
err error
)
volOptions := req.GetParameters()
clusterData, err := GetClusterInformation(volOptions)
if err != nil {
return nil, err
}
opts.ClusterID = clusterData.ClusterID
opts.Monitors = strings.Join(clusterData.Monitors, ",")
opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup
if err = extractOptionalOption(&opts.Pool, "pool", volOptions); err != nil {
return nil, err
}
if err = extractMounter(&opts.Mounter, volOptions); err != nil {
return nil, err
}
if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", volOptions); err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.FuseMountOptions, "fuseMountOptions", volOptions); err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.NamePrefix, "volumeNamePrefix", volOptions); err != nil {
return nil, err
}
opts.RequestName = requestName
err = opts.Connect(cr)
if err != nil {
return nil, err
}
fs := NewFileSystem(opts.conn)
opts.FscID, err = fs.GetFscID(ctx, opts.FsName)
if err != nil {
return nil, err
}
opts.MetadataPool, err = fs.GetMetadataPool(ctx, opts.FsName)
if err != nil {
return nil, err
}
// store topology information from the request
opts.TopologyPools, opts.TopologyRequirement, err = util.GetTopologyFromRequest(req)
if err != nil {
return nil, err
}
// TODO: we need an API to fetch subvolume attributes (size/datapool and others), based
// on which we can evaluate which topology this belongs to.
// CephFS tracker: https://tracker.ceph.com/issues/44277
if opts.TopologyPools != nil {
return nil, errors.New("topology based provisioning is not supported for CephFS backed volumes")
}
opts.ProvisionVolume = true
return &opts, nil
}
// newVolumeOptionsFromVolID generates a new instance of volumeOptions and VolumeIdentifier
// from the provided CSI VolumeID.
func NewVolumeOptionsFromVolID(
ctx context.Context,
volID string,
volOpt, secrets map[string]string) (*VolumeOptions, *VolumeIdentifier, error) {
var (
vi util.CSIIdentifier
volOptions VolumeOptions
vid VolumeIdentifier
)
// Decode the VolID first, to detect older volumes or pre-provisioned volumes
// before other errors
err := vi.DecomposeCSIID(volID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err)
return nil, nil, util.JoinErrors(cerrors.ErrInvalidVolID, err)
}
volOptions.ClusterID = vi.ClusterID
vid.VolumeID = volID
volOptions.FscID = vi.LocationID
if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil {
return nil, nil, fmt.Errorf("failed to fetch monitor list using clusterID (%s): %w", vi.ClusterID, err)
}
if volOptions.SubvolumeGroup, err = util.CephFSSubvolumeGroup(util.CsiConfigFile, vi.ClusterID); err != nil {
return nil, nil, fmt.Errorf("failed to fetch subvolumegroup list using clusterID (%s): %w", vi.ClusterID, err)
}
cr, err := util.NewAdminCredentials(secrets)
if err != nil {
return nil, nil, err
}
defer cr.DeleteCredentials()
err = volOptions.Connect(cr)
if err != nil {
return nil, nil, err
}
// in case of an error, volOptions is not returned, release any
// resources that may have been allocated
defer func() {
if err != nil {
volOptions.Destroy()
}
}()
fs := NewFileSystem(volOptions.conn)
volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID)
if err != nil {
return nil, nil, err
}
volOptions.MetadataPool, err = fs.GetMetadataPool(ctx, volOptions.FsName)
if err != nil {
return nil, nil, err
}
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, nil, err
}
defer j.Destroy()
imageAttributes, err := j.GetImageAttributes(
ctx, volOptions.MetadataPool, vi.ObjectUUID, false)
if err != nil {
return nil, nil, err
}
volOptions.RequestName = imageAttributes.RequestName
vid.FsSubvolName = imageAttributes.ImageName
if volOpt != nil {
if err = extractOptionalOption(&volOptions.Pool, "pool", volOpt); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&volOptions.KernelMountOptions, "kernelMountOptions", volOpt); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&volOptions.FuseMountOptions, "fuseMountOptions", volOpt); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&volOptions.SubvolumeGroup, "subvolumeGroup", volOpt); err != nil {
return nil, nil, err
}
if err = extractMounter(&volOptions.Mounter, volOpt); err != nil {
return nil, nil, err
}
}
volOptions.ProvisionVolume = true
info, err := volOptions.GetSubVolumeInfo(ctx, fsutil.VolumeID(vid.FsSubvolName))
if err == nil {
volOptions.RootPath = info.Path
volOptions.Features = info.Features
volOptions.Size = info.BytesQuota
}
if errors.Is(err, cerrors.ErrInvalidCommand) {
volOptions.RootPath, err = volOptions.GetVolumeRootPathCeph(ctx, fsutil.VolumeID(vid.FsSubvolName))
}
return &volOptions, &vid, err
}
// NewVolumeOptionsFromMonitorList generates a new instance of VolumeOptions and
// VolumeIdentifier from the provided CSI volume context.
func NewVolumeOptionsFromMonitorList(
volID string,
options, secrets map[string]string) (*VolumeOptions, *VolumeIdentifier, error) {
var (
opts VolumeOptions
vid VolumeIdentifier
provisionVolumeBool string
err error
)
// Check if monitors is part of the options
if err = extractOption(&opts.Monitors, "monitors", options); err != nil {
return nil, nil, err
}
// check if there are mon values in secret and if so override option retrieved monitors from
// monitors in the secret
mon, err := util.GetMonValFromSecret(secrets)
if err == nil && len(mon) > 0 {
opts.Monitors = mon
}
if err = extractOption(&provisionVolumeBool, "provisionVolume", options); err != nil {
return nil, nil, err
}
if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil {
return nil, nil, fmt.Errorf("failed to parse provisionVolume: %w", err)
}
if opts.ProvisionVolume {
if err = extractOption(&opts.Pool, "pool", options); err != nil {
return nil, nil, err
}
opts.RootPath = GetVolumeRootPathCephDeprecated(fsutil.VolumeID(volID))
} else {
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
}
}
if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.FuseMountOptions, "fuseMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractMounter(&opts.Mounter, options); err != nil {
return nil, nil, err
}
vid.FsSubvolName = volID
vid.VolumeID = volID
return &opts, &vid, nil
}
// NewVolumeOptionsFromStaticVolume generates a new instance of volumeOptions and
// VolumeIdentifier from the provided CSI volume context, if the provided context is
// detected to be a statically provisioned volume.
func NewVolumeOptionsFromStaticVolume(
volID string,
options map[string]string) (*VolumeOptions, *VolumeIdentifier, error) {
var (
opts VolumeOptions
vid VolumeIdentifier
staticVol bool
err error
)
val, ok := options["staticVolume"]
if !ok {
return nil, nil, cerrors.ErrNonStaticVolume
}
if staticVol, err = strconv.ParseBool(val); err != nil {
return nil, nil, fmt.Errorf("failed to parse preProvisionedVolume: %w", err)
}
if !staticVol {
return nil, nil, cerrors.ErrNonStaticVolume
}
// Volume is static, and ProvisionVolume carries bool stating if it was provisioned, hence
// store NOT of static boolean
opts.ProvisionVolume = !staticVol
clusterData, err := GetClusterInformation(options)
if err != nil {
return nil, nil, err
}
opts.ClusterID = clusterData.ClusterID
opts.Monitors = strings.Join(clusterData.Monitors, ",")
opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
}
if err = extractOption(&opts.FsName, "fsName", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.FuseMountOptions, "fuseMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.SubvolumeGroup, "subvolumeGroup", options); err != nil {
return nil, nil, err
}
if err = extractMounter(&opts.Mounter, options); err != nil {
return nil, nil, err
}
vid.FsSubvolName = opts.RootPath
vid.VolumeID = volID
return &opts, &vid, nil
}
// NewSnapshotOptionsFromID generates a new instance of volumeOptions and SnapshotIdentifier
// from the provided CSI VolumeID.
func NewSnapshotOptionsFromID(
ctx context.Context,
snapID string,
cr *util.Credentials) (*VolumeOptions, *SnapshotInfo, *SnapshotIdentifier, error) {
var (
vi util.CSIIdentifier
volOptions VolumeOptions
sid SnapshotIdentifier
)
// Decode the snapID first, to detect pre-provisioned snapshot before other errors
err := vi.DecomposeCSIID(snapID)
if err != nil {
return &volOptions, nil, &sid, cerrors.ErrInvalidVolID
}
volOptions.ClusterID = vi.ClusterID
sid.SnapshotID = snapID
volOptions.FscID = vi.LocationID
if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil {
return &volOptions, nil, &sid, fmt.Errorf(
"failed to fetch monitor list using clusterID (%s): %w",
vi.ClusterID,
err)
}
if volOptions.SubvolumeGroup, err = util.CephFSSubvolumeGroup(util.CsiConfigFile, vi.ClusterID); err != nil {
return &volOptions, nil, &sid, fmt.Errorf(
"failed to fetch subvolumegroup list using clusterID (%s): %w",
vi.ClusterID,
err)
}
err = volOptions.Connect(cr)
if err != nil {
return &volOptions, nil, &sid, err
}
// in case of an error, volOptions is returned, but callers may not
// expect to need to call Destroy() on it. So, make sure to release any
// resources that may have been allocated
defer func() {
if err != nil {
volOptions.Destroy()
}
}()
fs := NewFileSystem(volOptions.conn)
volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID)
if err != nil {
return &volOptions, nil, &sid, err
}
volOptions.MetadataPool, err = fs.GetMetadataPool(ctx, volOptions.FsName)
if err != nil {
return &volOptions, nil, &sid, err
}
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return &volOptions, nil, &sid, err
}
defer j.Destroy()
imageAttributes, err := j.GetImageAttributes(
ctx, volOptions.MetadataPool, vi.ObjectUUID, true)
if err != nil {
return &volOptions, nil, &sid, err
}
// storing request name in snapshot Identifier
sid.RequestName = imageAttributes.RequestName
sid.FsSnapshotName = imageAttributes.ImageName
sid.FsSubvolName = imageAttributes.SourceName
subvolInfo, err := volOptions.GetSubVolumeInfo(ctx, fsutil.VolumeID(sid.FsSubvolName))
if err != nil {
return &volOptions, nil, &sid, err
}
volOptions.Features = subvolInfo.Features
volOptions.Size = subvolInfo.BytesQuota
info, err := volOptions.GetSnapshotInfo(ctx, fsutil.VolumeID(sid.FsSnapshotName), fsutil.VolumeID(sid.FsSubvolName))
if err != nil {
return &volOptions, nil, &sid, err
}
return &volOptions, &info, &sid, nil
}
func GenSnapFromOptions(ctx context.Context, req *csi.CreateSnapshotRequest) (snap *CephfsSnapshot, err error) {
cephfsSnap := &CephfsSnapshot{}
cephfsSnap.RequestName = req.GetName()
snapOptions := req.GetParameters()
clusterID, err := util.GetClusterID(snapOptions)
if err != nil {
return nil, err
}
cephfsSnap.Monitors, cephfsSnap.ClusterID, err = util.GetMonsAndClusterID(ctx, clusterID, false)
if err != nil {
log.ErrorLog(ctx, "failed getting mons (%s)", err)
return nil, err
}
if namePrefix, ok := snapOptions["snapshotNamePrefix"]; ok {
cephfsSnap.NamePrefix = namePrefix
}
return cephfsSnap, nil
}