cleanup: move core functions to core pkg

as we are refractoring the cephfs code,
Moving all the core functions to a new folder
/pkg called core. This will make things easier
to implement. For now onwards all the core
functionalities will be added to the core
package.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2021-09-16 19:17:57 +05:30
committed by mergify[bot]
parent 64ade1d4c3
commit b1ef842640
15 changed files with 320 additions and 285 deletions

View File

@ -0,0 +1,264 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
// cephFSCloneState describes the status of the clone.
type cephFSCloneState string
const (
// cephFSCloneError indicates that fetching the clone state returned an error.
cephFSCloneError = cephFSCloneState("")
// cephFSCloneFailed indicates that clone is in failed state.
cephFSCloneFailed = cephFSCloneState("failed")
// cephFSClonePending indicates that clone is in pending state.
cephFSClonePending = cephFSCloneState("pending")
// cephFSCloneInprogress indicates that clone is in in-progress state.
cephFSCloneInprogress = cephFSCloneState("in-progress")
// cephFSCloneComplete indicates that clone is in complete state.
cephFSCloneComplete = cephFSCloneState("complete")
// SnapshotIsProtected string indicates that the snapshot is currently protected.
SnapshotIsProtected = "yes"
)
// toError checks the state of the clone if it's not cephFSCloneComplete.
func (cs cephFSCloneState) toError() error {
switch cs {
case cephFSCloneComplete:
return nil
case cephFSCloneError:
return cerrors.ErrInvalidClone
case cephFSCloneInprogress:
return cerrors.ErrCloneInProgress
case cephFSClonePending:
return cerrors.ErrClonePending
case cephFSCloneFailed:
return cerrors.ErrCloneFailed
}
return nil
}
func CreateCloneFromSubvolume(
ctx context.Context,
volID, cloneID fsutil.VolumeID,
volOpt,
parentvolOpt *VolumeOptions) error {
snapshotID := cloneID
err := parentvolOpt.CreateSnapshot(ctx, snapshotID, volID)
if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err)
return err
}
var (
// if protectErr is not nil we will delete the snapshot as the protect fails
protectErr error
// if cloneErr is not nil we will unprotect the snapshot and delete the snapshot
cloneErr error
)
defer func() {
if protectErr != nil {
err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
if cloneErr != nil {
if err = volOpt.PurgeVolume(ctx, cloneID, true); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", cloneID, err)
}
if err = parentvolOpt.UnprotectSnapshot(ctx, snapshotID, volID); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
}
}
if err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
}()
protectErr = parentvolOpt.ProtectSnapshot(ctx, snapshotID, volID)
if protectErr != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr)
return protectErr
}
cloneErr = parentvolOpt.cloneSnapshot(ctx, volID, snapshotID, cloneID, volOpt)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", volID, snapshotID, cloneID, cloneErr)
return cloneErr
}
cloneState, cloneErr := volOpt.getCloneState(ctx, cloneID)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to get clone state: %v", cloneErr)
return cloneErr
}
if cloneState != cephFSCloneComplete {
log.ErrorLog(ctx, "clone %s did not complete: %v", cloneID, cloneState.toError())
return cloneState.toError()
}
// This is a work around to fix sizing issue for cloned images
err = volOpt.ResizeVolume(ctx, cloneID, volOpt.Size)
if err != nil {
log.ErrorLog(ctx, "failed to expand volume %s: %v", cloneID, err)
return err
}
// As we completed clone, remove the intermediate snap
if err = parentvolOpt.UnprotectSnapshot(ctx, snapshotID, volID); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
return err
}
}
if err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
return err
}
return nil
}
func cleanupCloneFromSubvolumeSnapshot(
ctx context.Context,
volID, cloneID fsutil.VolumeID,
parentVolOpt *VolumeOptions) error {
// snapshot name is same as clone name as we need a name which can be
// identified during PVC-PVC cloning.
snapShotID := cloneID
snapInfo, err := parentVolOpt.GetSnapshotInfo(ctx, snapShotID, volID)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil
}
return err
}
if snapInfo.Protected == SnapshotIsProtected {
err = parentVolOpt.UnprotectSnapshot(ctx, snapShotID, volID)
if err != nil {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err)
return err
}
}
err = parentVolOpt.DeleteSnapshot(ctx, snapShotID, volID)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapShotID, err)
return err
}
return nil
}
func CreateCloneFromSnapshot(
ctx context.Context,
parentVolOpt, volOptions *VolumeOptions,
vID *VolumeIdentifier,
sID *SnapshotIdentifier) error {
snapID := fsutil.VolumeID(sID.FsSnapshotName)
err := parentVolOpt.cloneSnapshot(
ctx,
fsutil.VolumeID(sID.FsSubvolName),
snapID,
fsutil.VolumeID(vID.FsSubvolName),
volOptions)
if err != nil {
return err
}
defer func() {
if err != nil {
if !cerrors.IsCloneRetryError(err) {
if dErr := volOptions.PurgeVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), true); dErr != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, dErr)
}
}
}
}()
cloneState, err := volOptions.getCloneState(ctx, fsutil.VolumeID(vID.FsSubvolName))
if err != nil {
log.ErrorLog(ctx, "failed to get clone state: %v", err)
return err
}
if cloneState != cephFSCloneComplete {
return cloneState.toError()
}
// The clonedvolume currently does not reflect the proper size due to an issue in cephfs
// however this is getting addressed in cephfs and the parentvolume size will be reflected
// in the new cloned volume too. Till then we are explicitly making the size set
err = volOptions.ResizeVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), volOptions.Size)
if err != nil {
log.ErrorLog(ctx, "failed to expand volume %s with error: %v", vID.FsSubvolName, err)
return err
}
return nil
}
func (vo *VolumeOptions) getCloneState(ctx context.Context, volID fsutil.VolumeID) (cephFSCloneState, error) {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(
ctx,
"could not get FSAdmin, can get clone status for volume %s with ID %s: %v",
vo.FsName,
string(volID),
err)
return cephFSCloneError, err
}
cs, err := fsa.CloneStatus(vo.FsName, vo.SubvolumeGroup, string(volID))
if err != nil {
log.ErrorLog(ctx, "could not get clone state for volume %s with ID %s: %v", vo.FsName, string(volID), err)
return cephFSCloneError, err
}
return cephFSCloneState(cs.State), nil
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"testing"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/stretchr/testify/assert"
)
func TestCloneStateToError(t *testing.T) {
t.Parallel()
errorState := make(map[cephFSCloneState]error)
errorState[cephFSCloneComplete] = nil
errorState[cephFSCloneError] = cerrors.ErrInvalidClone
errorState[cephFSCloneInprogress] = cerrors.ErrCloneInProgress
errorState[cephFSClonePending] = cerrors.ErrClonePending
errorState[cephFSCloneFailed] = cerrors.ErrCloneFailed
for state, err := range errorState {
assert.Equal(t, state.toError(), err)
}
}

View File

@ -0,0 +1,100 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"fmt"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
func (vo *VolumeOptions) getFscID(ctx context.Context) (int64, error) {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not fetch filesystem ID for %s:", vo.FsName, err)
return 0, err
}
volumes, err := fsa.EnumerateVolumes()
if err != nil {
log.ErrorLog(ctx, "could not list volumes, can not fetch filesystem ID for %s:", vo.FsName, err)
return 0, err
}
for _, vol := range volumes {
if vol.Name == vo.FsName {
return vol.ID, nil
}
}
log.ErrorLog(ctx, "failed to list volume %s", vo.FsName)
return 0, cerrors.ErrVolumeNotFound
}
func (vo *VolumeOptions) getMetadataPool(ctx context.Context) (string, error) {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not fetch metadata pool for %s:", vo.FsName, err)
return "", err
}
fsPoolInfos, err := fsa.ListFileSystems()
if err != nil {
log.ErrorLog(ctx, "could not list filesystems, can not fetch metadata pool for %s:", vo.FsName, err)
return "", err
}
for _, fspi := range fsPoolInfos {
if fspi.Name == vo.FsName {
return fspi.MetadataPool, nil
}
}
return "", fmt.Errorf("%w: could not find metadata pool for %s", util.ErrPoolNotFound, vo.FsName)
}
func (vo *VolumeOptions) getFsName(ctx context.Context) (string, error) {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not fetch filesystem name for ID %d:", vo.FscID, err)
return "", err
}
volumes, err := fsa.EnumerateVolumes()
if err != nil {
log.ErrorLog(ctx, "could not list volumes, can not fetch filesystem name for ID %d:", vo.FscID, err)
return "", err
}
for _, vol := range volumes {
if vol.ID == vo.FscID {
return vol.Name, nil
}
}
return "", fmt.Errorf("%w: fscID (%d) not found in Ceph cluster", util.ErrPoolNotFound, vo.FscID)
}

View File

@ -0,0 +1,436 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
"fmt"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/golang/protobuf/ptypes/timestamp"
)
var (
// VolJournal is used to maintain RADOS based journals for CO generated.
// VolumeName to backing CephFS subvolumes.
VolJournal *journal.Config
// SnapJournal is used to maintain RADOS based journals for CO generated.
// SnapshotName to backing CephFS subvolumes.
SnapJournal *journal.Config
)
// VolumeIdentifier structure contains an association between the CSI VolumeID to its subvolume
// name on the backing CephFS instance.
type VolumeIdentifier struct {
FsSubvolName string
VolumeID string
}
type SnapshotIdentifier struct {
FsSnapshotName string
SnapshotID string
RequestName string
CreationTime *timestamp.Timestamp
FsSubvolName string
}
/*
CheckVolExists checks to determine if passed in RequestName in volOptions exists on the backend.
**NOTE:** These functions manipulate the rados omaps that hold information regarding
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
respective CSI driver generated volume name based locks are held, as otherwise racy
access to these omaps may end up leaving them in an inconsistent state.
These functions also cleanup omap reservations that are stale. I.e when omap entries exist and
backing subvolumes are missing, or one of the omaps exist and the next is missing. This is
because, the order of omap creation and deletion are inverse of each other, and protected by the
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
hence safe to garbage collect.
*/
// nolint:gocognit,gocyclo,nestif,cyclop // TODO: reduce complexity
func CheckVolExists(ctx context.Context,
volOptions,
parentVolOpt *VolumeOptions,
pvID *VolumeIdentifier,
sID *SnapshotIdentifier,
cr *util.Credentials) (*VolumeIdentifier, error) {
var vid VolumeIdentifier
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
imageData, err := j.CheckReservation(
ctx, volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "")
if err != nil {
return nil, err
}
if imageData == nil {
return nil, nil
}
imageUUID := imageData.ImageUUID
vid.FsSubvolName = imageData.ImageAttributes.ImageName
if sID != nil || pvID != nil {
cloneState, cloneStateErr := volOptions.getCloneState(ctx, fsutil.VolumeID(vid.FsSubvolName))
if cloneStateErr != nil {
if errors.Is(cloneStateErr, cerrors.ErrVolumeNotFound) {
if pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx, fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
}
if cloneState == cephFSCloneInprogress {
return nil, cerrors.ErrCloneInProgress
}
if cloneState == cephFSClonePending {
return nil, cerrors.ErrClonePending
}
if cloneState == cephFSCloneFailed {
err = volOptions.PurgeVolume(ctx, fsutil.VolumeID(vid.FsSubvolName), true)
if err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", vid.FsSubvolName, err)
return nil, err
}
if pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx, fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
if cloneState != cephFSCloneComplete {
return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName)
}
}
volOptions.RootPath, err = volOptions.GetVolumeRootPathCeph(ctx, fsutil.VolumeID(vid.FsSubvolName))
if err != nil {
if errors.Is(err, cerrors.ErrVolumeNotFound) {
// If the subvolume is not present, cleanup the stale snapshot
// created for clone.
if parentVolOpt != nil && pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx,
fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
}
// check if topology constraints match what is found
// TODO: we need an API to fetch subvolume attributes (size/datapool and others), based
// on which we can evaluate which topology this belongs to.
// TODO: CephFS topology support is postponed till we get the same
// TODO: size checks
// found a volume already available, process and return it!
vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, fsutil.VolIDVersion)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Found existing volume (%s) with subvolume name (%s) for request (%s)",
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
if parentVolOpt != nil && pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx,
fsutil.VolumeID(pvID.FsSubvolName),
fsutil.VolumeID(vid.FsSubvolName),
parentVolOpt)
if err != nil {
return nil, err
}
}
return &vid, nil
}
// UndoVolReservation is a helper routine to undo a name reservation for a CSI VolumeName.
func UndoVolReservation(
ctx context.Context,
volOptions *VolumeOptions,
vid VolumeIdentifier,
secret map[string]string) error {
cr, err := util.NewAdminCredentials(secret)
if err != nil {
return err
}
defer cr.DeleteCredentials()
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return err
}
defer j.Destroy()
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return err
}
func updateTopologyConstraints(volOpts *VolumeOptions) error {
// update request based on topology constrained parameters (if present)
poolName, _, topology, err := util.FindPoolAndTopology(volOpts.TopologyPools, volOpts.TopologyRequirement)
if err != nil {
return err
}
if poolName != "" {
volOpts.Pool = poolName
volOpts.Topology = topology
}
return nil
}
// ReserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and,
// to generate the volume identifier for the reserved UUID.
func ReserveVol(ctx context.Context, volOptions *VolumeOptions, secret map[string]string) (*VolumeIdentifier, error) {
var (
vid VolumeIdentifier
imageUUID string
err error
)
cr, err := util.NewAdminCredentials(secret)
if err != nil {
return nil, err
}
defer cr.DeleteCredentials()
err = updateTopologyConstraints(volOptions)
if err != nil {
return nil, err
}
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
imageUUID, vid.FsSubvolName, err = j.ReserveName(
ctx, volOptions.MetadataPool, util.InvalidPoolID,
volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName,
volOptions.NamePrefix, "", "", volOptions.ReservedID, "")
if err != nil {
return nil, err
}
// generate the volume ID to return to the CO system
vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, fsutil.VolIDVersion)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Generated Volume ID (%s) and subvolume name (%s) for request name (%s)",
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
return &vid, nil
}
// ReserveSnap is a helper routine to request a UUID reservation for the CSI SnapName and,
// to generate the snapshot identifier for the reserved UUID.
func ReserveSnap(
ctx context.Context,
volOptions *VolumeOptions,
parentSubVolName string,
snap *CephfsSnapshot,
cr *util.Credentials) (*SnapshotIdentifier, error) {
var (
vid SnapshotIdentifier
imageUUID string
err error
)
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
imageUUID, vid.FsSnapshotName, err = j.ReserveName(
ctx, volOptions.MetadataPool, util.InvalidPoolID,
volOptions.MetadataPool, util.InvalidPoolID, snap.RequestName,
snap.NamePrefix, parentSubVolName, "", snap.ReservedID, "")
if err != nil {
return nil, err
}
// generate the snapshot ID to return to the CO system
vid.SnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, fsutil.VolIDVersion)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Generated Snapshot ID (%s) for request name (%s)",
vid.SnapshotID, snap.RequestName)
return &vid, nil
}
// UndoSnapReservation is a helper routine to undo a name reservation for a CSI SnapshotName.
func UndoSnapReservation(
ctx context.Context,
volOptions *VolumeOptions,
vid SnapshotIdentifier,
snapName string,
cr *util.Credentials) error {
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return err
}
defer j.Destroy()
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSnapshotName, snapName)
return err
}
/*
CheckSnapExists checks to determine if passed in RequestName in volOptions exists on the backend.
**NOTE:** These functions manipulate the rados omaps that hold information regarding
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
respective CSI driver generated volume name based locks are held, as otherwise racy
access to these omaps may end up leaving them in an inconsistent state.
These functions also cleanup omap reservations that are stale. I.e. when omap entries exist and
backing subvolumes are missing, or one of the omaps exist and the next is missing. This is
because, the order of omap creation and deletion are inverse of each other, and protected by the
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
hence safe to garbage collect.
*/
func CheckSnapExists(
ctx context.Context,
volOptions *VolumeOptions,
parentSubVolName string,
snap *CephfsSnapshot,
cr *util.Credentials) (*SnapshotIdentifier, *SnapshotInfo, error) {
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, nil, err
}
defer j.Destroy()
snapData, err := j.CheckReservation(
ctx, volOptions.MetadataPool, snap.RequestName, snap.NamePrefix, parentSubVolName, "")
if err != nil {
return nil, nil, err
}
if snapData == nil {
return nil, nil, nil
}
sid := &SnapshotIdentifier{}
snapUUID := snapData.ImageUUID
snapID := snapData.ImageAttributes.ImageName
sid.FsSnapshotName = snapData.ImageAttributes.ImageName
snapInfo, err := volOptions.GetSnapshotInfo(ctx, fsutil.VolumeID(snapID), fsutil.VolumeID(parentSubVolName))
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, snapID, snap.RequestName)
return nil, nil, err
}
return nil, nil, err
}
defer func() {
if err != nil {
err = volOptions.DeleteSnapshot(ctx, fsutil.VolumeID(snapID), fsutil.VolumeID(parentSubVolName))
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s: %v", snapID, err)
return
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, snapID, snap.RequestName)
if err != nil {
log.ErrorLog(ctx, "removing reservation failed for snapshot %s: %v", snapID, err)
}
}
}()
tm, err := fsutil.ParseTime(ctx, snapInfo.CreatedAt)
if err != nil {
return nil, nil, err
}
sid.CreationTime = tm
// found a snapshot already available, process and return it!
sid.SnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, snapUUID, fsutil.VolIDVersion)
if err != nil {
return nil, nil, err
}
log.DebugLog(ctx, "Found existing snapshot (%s) with subvolume name (%s) for request (%s)",
snapData.ImageAttributes.RequestName, parentSubVolName, sid.FsSnapshotName)
return sid, &snapInfo, nil
}

View File

@ -0,0 +1,233 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
"time"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/ceph/go-ceph/cephfs/admin"
"github.com/ceph/go-ceph/rados"
"github.com/golang/protobuf/ptypes/timestamp"
)
// autoProtect points to the snapshot auto-protect feature of
// the subvolume.
const (
autoProtect = "snapshot-autoprotect"
)
// CephfsSnapshot represents a CSI snapshot and its cluster information.
type CephfsSnapshot struct {
NamePrefix string
Monitors string
// MetadataPool & Pool fields are not used atm. But its definitely good to have it in this struct
// so keeping it here
MetadataPool string
Pool string
ClusterID string
RequestName string
// ReservedID represents the ID reserved for a snapshot
ReservedID string
}
func (vo *VolumeOptions) CreateSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.CreateSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID))
if err != nil {
log.ErrorLog(ctx, "failed to create subvolume snapshot %s %s in fs %s: %s",
string(snapID), string(volID), vo.FsName, err)
return err
}
return nil
}
func (vo *VolumeOptions) DeleteSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.ForceRemoveSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID))
if err != nil {
log.ErrorLog(ctx, "failed to delete subvolume snapshot %s %s in fs %s: %s",
string(snapID), string(volID), vo.FsName, err)
return err
}
return nil
}
type SnapshotInfo struct {
CreatedAt time.Time
CreationTime *timestamp.Timestamp
HasPendingClones string
Protected string
}
func (vo *VolumeOptions) GetSnapshotInfo(ctx context.Context, snapID, volID fsutil.VolumeID) (SnapshotInfo, error) {
snap := SnapshotInfo{}
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return snap, err
}
info, err := fsa.SubVolumeSnapshotInfo(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID))
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
return snap, cerrors.ErrSnapNotFound
}
log.ErrorLog(
ctx,
"failed to get subvolume snapshot info %s %s in fs %s with error %s",
string(volID),
string(snapID),
vo.FsName,
err)
return snap, err
}
snap.CreatedAt = info.CreatedAt.Time
snap.HasPendingClones = info.HasPendingClones
snap.Protected = info.Protected
return snap, nil
}
func (vo *VolumeOptions) ProtectSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
// If "snapshot-autoprotect" feature is present, The ProtectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, vo.Features) {
return nil
}
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.ProtectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID),
string(snapID))
if err != nil {
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to protect subvolume snapshot %s %s in fs %s with error: %s",
string(volID),
string(snapID),
vo.FsName,
err)
return err
}
return nil
}
func (vo *VolumeOptions) UnprotectSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error {
// If "snapshot-autoprotect" feature is present, The UnprotectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, vo.Features) {
return nil
}
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.UnprotectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID),
string(snapID))
if err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error.
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to unprotect subvolume snapshot %s %s in fs %s with error: %s",
string(volID),
string(snapID),
vo.FsName,
err)
return err
}
return nil
}
func (vo *VolumeOptions) cloneSnapshot(
ctx context.Context,
volID, snapID, cloneID fsutil.VolumeID,
cloneVolOptions *VolumeOptions,
) error {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
co := &admin.CloneOptions{
TargetGroup: cloneVolOptions.SubvolumeGroup,
}
if cloneVolOptions.Pool != "" {
co.PoolLayout = cloneVolOptions.Pool
}
err = fsa.CloneSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID), string(cloneID), co)
if err != nil {
log.ErrorLog(
ctx,
"failed to clone subvolume snapshot %s %s in fs %s with error: %s",
string(volID),
string(snapID),
string(cloneID),
vo.FsName,
err)
if errors.Is(err, rados.ErrNotFound) {
return cerrors.ErrVolumeNotFound
}
return err
}
return nil
}

View File

@ -0,0 +1,279 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
"fmt"
"path"
"strings"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
fsAdmin "github.com/ceph/go-ceph/cephfs/admin"
"github.com/ceph/go-ceph/rados"
)
// clusterAdditionalInfo contains information regarding if resize is
// supported in the particular cluster and subvolumegroup is
// created or not.
// Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information.
var clusterAdditionalInfo = make(map[string]*localClusterState)
const (
cephEntityClientPrefix = "client."
// modeAllRWX can be used for setting permissions to Read-Write-eXecute
// for User, Group and Other.
modeAllRWX = 0o777
)
// Subvolume holds subvolume information. This includes only the needed members
// from fsAdmin.SubVolumeInfo.
type Subvolume struct {
BytesQuota int64
Path string
Features []string
}
func GetVolumeRootPathCephDeprecated(volID fsutil.VolumeID) string {
return path.Join("/", "csi-volumes", string(volID))
}
func (vo *VolumeOptions) GetVolumeRootPathCeph(ctx context.Context, volID fsutil.VolumeID) (string, error) {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin err %s", err)
return "", err
}
svPath, err := fsa.SubVolumePath(vo.FsName, vo.SubvolumeGroup, string(volID))
if err != nil {
log.ErrorLog(ctx, "failed to get the rootpath for the vol %s: %s", string(volID), err)
if errors.Is(err, rados.ErrNotFound) {
return "", util.JoinErrors(cerrors.ErrVolumeNotFound, err)
}
return "", err
}
return svPath, nil
}
func (vo *VolumeOptions) GetSubVolumeInfo(ctx context.Context, volID fsutil.VolumeID) (*Subvolume, error) {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not fetch metadata pool for %s:", vo.FsName, err)
return nil, err
}
info, err := fsa.SubVolumeInfo(vo.FsName, vo.SubvolumeGroup, string(volID))
if err != nil {
log.ErrorLog(ctx, "failed to get subvolume info for the vol %s: %s", string(volID), err)
if errors.Is(err, rados.ErrNotFound) {
return nil, cerrors.ErrVolumeNotFound
}
// In case the error is invalid command return error to the caller.
var invalid fsAdmin.NotImplementedError
if errors.As(err, &invalid) {
return nil, cerrors.ErrInvalidCommand
}
return nil, err
}
subvol := Subvolume{
// only set BytesQuota when it is of type ByteCount
Path: info.Path,
Features: make([]string, len(info.Features)),
}
bc, ok := info.BytesQuota.(fsAdmin.ByteCount)
if !ok {
// If info.BytesQuota == Infinite (in case it is not set)
// or nil (in case the subvolume is in snapshot-retained state),
// just continue without returning quota information.
if !(info.BytesQuota == fsAdmin.Infinite || info.State == fsAdmin.StateSnapRetained) {
return nil, fmt.Errorf("subvolume %s has unsupported quota: %v", string(volID), info.BytesQuota)
}
} else {
subvol.BytesQuota = int64(bc)
}
for i, feature := range info.Features {
subvol.Features[i] = string(feature)
}
return &subvol, nil
}
type operationState int64
const (
unknown operationState = iota
supported
unsupported
)
type localClusterState struct {
// set the enum value i.e., unknown, supported,
// unsupported as per the state of the cluster.
resizeState operationState
// set true once a subvolumegroup is created
// for corresponding cluster.
subVolumeGroupCreated bool
}
func CreateVolume(ctx context.Context, volOptions *VolumeOptions, volID fsutil.VolumeID, bytesQuota int64) error {
// verify if corresponding ClusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent := clusterAdditionalInfo[volOptions.ClusterID]; !keyPresent {
clusterAdditionalInfo[volOptions.ClusterID] = &localClusterState{}
}
ca, err := volOptions.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not create subvolume %s: %s", string(volID), err)
return err
}
// create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated {
opts := fsAdmin.SubVolumeGroupOptions{}
err = ca.CreateSubVolumeGroup(volOptions.FsName, volOptions.SubvolumeGroup, &opts)
if err != nil {
log.ErrorLog(
ctx,
"failed to create subvolume group %s, for the vol %s: %s",
volOptions.SubvolumeGroup,
string(volID),
err)
return err
}
log.DebugLog(ctx, "cephfs: created subvolume group %s", volOptions.SubvolumeGroup)
clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated = true
}
opts := fsAdmin.SubVolumeOptions{
Size: fsAdmin.ByteCount(bytesQuota),
Mode: modeAllRWX,
}
if volOptions.Pool != "" {
opts.PoolLayout = volOptions.Pool
}
// FIXME: check if the right credentials are used ("-n", cephEntityClientPrefix + cr.ID)
err = ca.CreateSubVolume(volOptions.FsName, volOptions.SubvolumeGroup, string(volID), &opts)
if err != nil {
log.ErrorLog(ctx, "failed to create subvolume %s in fs %s: %s", string(volID), volOptions.FsName, err)
return err
}
return nil
}
// ResizeVolume will try to use ceph fs subvolume resize command to resize the
// subvolume. If the command is not available as a fallback it will use
// CreateVolume to resize the subvolume.
func (vo *VolumeOptions) ResizeVolume(ctx context.Context, volID fsutil.VolumeID, bytesQuota int64) error {
// keyPresent checks whether corresponding clusterID key is present in clusterAdditionalInfo
var keyPresent bool
// verify if corresponding ClusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent = clusterAdditionalInfo[vo.ClusterID]; !keyPresent {
clusterAdditionalInfo[vo.ClusterID] = &localClusterState{}
}
// resize subvolume when either it's supported, or when corresponding
// clusterID key was not present.
if clusterAdditionalInfo[vo.ClusterID].resizeState == unknown ||
clusterAdditionalInfo[vo.ClusterID].resizeState == supported {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", vo.FsName, err)
return err
}
_, err = fsa.ResizeSubVolume(vo.FsName, vo.SubvolumeGroup, string(volID), fsAdmin.ByteCount(bytesQuota), true)
if err == nil {
clusterAdditionalInfo[vo.ClusterID].resizeState = supported
return nil
}
var invalid fsAdmin.NotImplementedError
// In case the error is other than invalid command return error to the caller.
if !errors.As(err, &invalid) {
log.ErrorLog(ctx, "failed to resize subvolume %s in fs %s: %s", string(volID), vo.FsName, err)
return err
}
}
clusterAdditionalInfo[vo.ClusterID].resizeState = unsupported
return CreateVolume(ctx, vo, volID, bytesQuota)
}
func (vo *VolumeOptions) PurgeVolume(ctx context.Context, volID fsutil.VolumeID, force bool) error {
fsa, err := vo.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin %s:", err)
return err
}
opt := fsAdmin.SubVolRmFlags{}
opt.Force = force
if checkSubvolumeHasFeature("snapshot-retention", vo.Features) {
opt.RetainSnapshots = true
}
err = fsa.RemoveSubVolumeWithFlags(vo.FsName, vo.SubvolumeGroup, string(volID), opt)
if err != nil {
log.ErrorLog(ctx, "failed to purge subvolume %s in fs %s: %s", string(volID), vo.FsName, err)
if strings.Contains(err.Error(), cerrors.VolumeNotEmpty) {
return util.JoinErrors(cerrors.ErrVolumeHasSnapshots, err)
}
if errors.Is(err, rados.ErrNotFound) {
return util.JoinErrors(cerrors.ErrVolumeNotFound, err)
}
return err
}
return nil
}
// checkSubvolumeHasFeature verifies if the referred subvolume has
// the required feature.
func checkSubvolumeHasFeature(feature string, subVolFeatures []string) bool {
// The subvolume "features" are based on the internal version of the subvolume.
// Verify if subvolume supports the required feature.
for _, subvolFeature := range subVolFeatures {
if subvolFeature == feature {
return true
}
}
return false
}

View File

@ -0,0 +1,310 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
const (
volumeMounterFuse = "fuse"
volumeMounterKernel = "kernel"
netDev = "_netdev"
)
var (
availableMounters []string
// maps a mountpoint to PID of its FUSE daemon.
fusePidMap = make(map[string]int)
fusePidMapMtx sync.Mutex
fusePidRx = regexp.MustCompile(`(?m)^ceph-fuse\[(.+)\]: starting fuse$`)
// nolint:gomnd // numbers specify Kernel versions.
quotaSupport = []util.KernelVersion{
{
Version: 4,
PatchLevel: 17,
SubLevel: 0,
ExtraVersion: 0, Distribution: "",
Backport: false,
}, // standard 4.17+ versions
{
Version: 3,
PatchLevel: 10,
SubLevel: 0,
ExtraVersion: 1062,
Distribution: ".el7",
Backport: true,
}, // RHEL-7.7
}
)
func execCommandErr(ctx context.Context, program string, args ...string) error {
_, _, err := util.ExecCommand(ctx, program, args...)
return err
}
// Load available ceph mounters installed on system into availableMounters
// Called from driver.go's Run().
func LoadAvailableMounters(conf *util.Config) error {
// #nosec
fuseMounterProbe := exec.Command("ceph-fuse", "--version")
// #nosec
kernelMounterProbe := exec.Command("mount.ceph")
err := kernelMounterProbe.Run()
if err != nil {
log.ErrorLogMsg("failed to run mount.ceph %v", err)
} else {
// fetch the current running kernel info
release, kvErr := util.GetKernelVersion()
if kvErr != nil {
return kvErr
}
if conf.ForceKernelCephFS || util.CheckKernelSupport(release, quotaSupport) {
log.DefaultLog("loaded mounter: %s", volumeMounterKernel)
availableMounters = append(availableMounters, volumeMounterKernel)
} else {
log.DefaultLog("kernel version < 4.17 might not support quota feature, hence not loading kernel client")
}
}
err = fuseMounterProbe.Run()
if err != nil {
log.ErrorLogMsg("failed to run ceph-fuse %v", err)
} else {
log.DefaultLog("loaded mounter: %s", volumeMounterFuse)
availableMounters = append(availableMounters, volumeMounterFuse)
}
if len(availableMounters) == 0 {
return errors.New("no ceph mounters found on system")
}
return nil
}
type VolumeMounter interface {
Mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *VolumeOptions) error
Name() string
}
func NewMounter(volOptions *VolumeOptions) (VolumeMounter, error) {
// Get the mounter from the configuration
wantMounter := volOptions.Mounter
// Verify that it's available
var chosenMounter string
for _, availMounter := range availableMounters {
if availMounter == wantMounter {
chosenMounter = wantMounter
break
}
}
if chosenMounter == "" {
// Otherwise pick whatever is left
chosenMounter = availableMounters[0]
log.DebugLogMsg("requested mounter: %s, chosen mounter: %s", wantMounter, chosenMounter)
}
// Create the mounter
switch chosenMounter {
case volumeMounterFuse:
return &FuseMounter{}, nil
case volumeMounterKernel:
return &KernelMounter{}, nil
}
return nil, fmt.Errorf("unknown mounter '%s'", chosenMounter)
}
type FuseMounter struct{}
func mountFuse(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *VolumeOptions) error {
args := []string{
mountPoint,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix + cr.ID, "--keyfile=" + cr.KeyFile,
"-r", volOptions.RootPath,
}
fmo := "nonempty"
if volOptions.FuseMountOptions != "" {
fmo += "," + strings.TrimSpace(volOptions.FuseMountOptions)
}
args = append(args, "-o", fmo)
if volOptions.FsName != "" {
args = append(args, "--client_mds_namespace="+volOptions.FsName)
}
_, stderr, err := util.ExecCommand(ctx, "ceph-fuse", args[:]...)
if err != nil {
return fmt.Errorf("%w stderr: %s", err, stderr)
}
// Parse the output:
// We need "starting fuse" meaning the mount is ok
// and PID of the ceph-fuse daemon for unmount
match := fusePidRx.FindSubmatch([]byte(stderr))
// validMatchLength is set to 2 as match is expected
// to have 2 items, starting fuse and PID of the fuse daemon
const validMatchLength = 2
if len(match) != validMatchLength {
return fmt.Errorf("ceph-fuse failed: %s", stderr)
}
pid, err := strconv.Atoi(string(match[1]))
if err != nil {
return fmt.Errorf("failed to parse FUSE daemon PID: %w", err)
}
fusePidMapMtx.Lock()
fusePidMap[mountPoint] = pid
fusePidMapMtx.Unlock()
return nil
}
func (m *FuseMounter) Mount(
ctx context.Context,
mountPoint string,
cr *util.Credentials,
volOptions *VolumeOptions) error {
if err := util.CreateMountPoint(mountPoint); err != nil {
return err
}
return mountFuse(ctx, mountPoint, cr, volOptions)
}
func (m *FuseMounter) Name() string { return "Ceph FUSE driver" }
type KernelMounter struct{}
func mountKernel(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *VolumeOptions) error {
if err := execCommandErr(ctx, "modprobe", "ceph"); err != nil {
return err
}
args := []string{
"-t", "ceph",
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
mountPoint,
}
optionsStr := fmt.Sprintf("name=%s,secretfile=%s", cr.ID, cr.KeyFile)
mdsNamespace := ""
if volOptions.FsName != "" {
mdsNamespace = fmt.Sprintf("mds_namespace=%s", volOptions.FsName)
}
optionsStr = util.MountOptionsAdd(optionsStr, mdsNamespace, volOptions.KernelMountOptions, netDev)
args = append(args, "-o", optionsStr)
_, stderr, err := util.ExecCommand(ctx, "mount", args[:]...)
if err != nil {
return fmt.Errorf("%w stderr: %s", err, stderr)
}
return err
}
func (m *KernelMounter) Mount(
ctx context.Context,
mountPoint string,
cr *util.Credentials,
volOptions *VolumeOptions) error {
if err := util.CreateMountPoint(mountPoint); err != nil {
return err
}
return mountKernel(ctx, mountPoint, cr, volOptions)
}
func (m *KernelMounter) Name() string { return "Ceph kernel client" }
func BindMount(ctx context.Context, from, to string, readOnly bool, mntOptions []string) error {
mntOptionSli := strings.Join(mntOptions, ",")
if err := execCommandErr(ctx, "mount", "-o", mntOptionSli, from, to); err != nil {
return fmt.Errorf("failed to bind-mount %s to %s: %w", from, to, err)
}
if readOnly {
mntOptionSli = util.MountOptionsAdd(mntOptionSli, "remount")
if err := execCommandErr(ctx, "mount", "-o", mntOptionSli, to); err != nil {
return fmt.Errorf("failed read-only remount of %s: %w", to, err)
}
}
return nil
}
func UnmountVolume(ctx context.Context, mountPoint string) error {
if _, stderr, err := util.ExecCommand(ctx, "umount", mountPoint); err != nil {
err = fmt.Errorf("%w stderr: %s", err, stderr)
if strings.Contains(err.Error(), fmt.Sprintf("umount: %s: not mounted", mountPoint)) ||
strings.Contains(err.Error(), "No such file or directory") {
return nil
}
return err
}
fusePidMapMtx.Lock()
pid, ok := fusePidMap[mountPoint]
if ok {
delete(fusePidMap, mountPoint)
}
fusePidMapMtx.Unlock()
if ok {
p, err := os.FindProcess(pid)
if err != nil {
log.WarningLog(ctx, "failed to find process %d: %v", pid, err)
} else {
if _, err = p.Wait(); err != nil {
log.WarningLog(ctx, "%d is not a child process: %v", pid, err)
}
}
}
return nil
}

View File

@ -0,0 +1,612 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
type VolumeOptions struct {
TopologyPools *[]util.TopologyConstrainedPool
TopologyRequirement *csi.TopologyRequirement
Topology map[string]string
RequestName string
NamePrefix string
Size int64
ClusterID string
FsName string
FscID int64
// ReservedID represents the ID reserved for a subvolume
ReservedID string
MetadataPool string
Monitors string `json:"monitors"`
Pool string `json:"pool"`
RootPath string `json:"rootPath"`
Mounter string `json:"mounter"`
ProvisionVolume bool `json:"provisionVolume"`
KernelMountOptions string `json:"kernelMountOptions"`
FuseMountOptions string `json:"fuseMountOptions"`
SubvolumeGroup string
Features []string
// conn is a connection to the Ceph cluster obtained from a ConnPool
conn *util.ClusterConnection
}
// Connect a CephFS volume to the Ceph cluster.
func (vo *VolumeOptions) Connect(cr *util.Credentials) error {
if vo.conn != nil {
return nil
}
conn := &util.ClusterConnection{}
if err := conn.Connect(vo.Monitors, cr); err != nil {
return err
}
vo.conn = conn
return nil
}
// Destroy cleans up the CephFS volume object and closes the connection to the
// Ceph cluster in case one was setup.
func (vo *VolumeOptions) Destroy() {
if vo.conn != nil {
vo.conn.Destroy()
}
}
func validateNonEmptyField(field, fieldName string) error {
if field == "" {
return fmt.Errorf("parameter '%s' cannot be empty", fieldName)
}
return nil
}
func extractOptionalOption(dest *string, optionLabel string, options map[string]string) error {
opt, ok := options[optionLabel]
if !ok {
// Option not found, no error as it is optional
return nil
}
if err := validateNonEmptyField(opt, optionLabel); err != nil {
return err
}
*dest = opt
return nil
}
func extractOption(dest *string, optionLabel string, options map[string]string) error {
opt, ok := options[optionLabel]
if !ok {
return fmt.Errorf("missing required field %s", optionLabel)
}
if err := validateNonEmptyField(opt, optionLabel); err != nil {
return err
}
*dest = opt
return nil
}
func validateMounter(m string) error {
switch m {
case volumeMounterFuse:
case volumeMounterKernel:
default:
return fmt.Errorf("unknown mounter '%s'. Valid options are 'fuse' and 'kernel'", m)
}
return nil
}
func extractMounter(dest *string, options map[string]string) error {
if err := extractOptionalOption(dest, "mounter", options); err != nil {
return err
}
if *dest != "" {
if err := validateMounter(*dest); err != nil {
return err
}
}
return nil
}
func GetClusterInformation(options map[string]string) (*util.ClusterInfo, error) {
clusterID, ok := options["clusterID"]
if !ok {
err := fmt.Errorf("clusterID must be set")
return nil, err
}
if err := validateNonEmptyField(clusterID, "clusterID"); err != nil {
return nil, err
}
monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s): %w", clusterID, err)
return nil, err
}
subvolumeGroup, err := util.CephFSSubvolumeGroup(util.CsiConfigFile, clusterID)
if err != nil {
err = fmt.Errorf("failed to fetch subvolumegroup using clusterID (%s): %w", clusterID, err)
return nil, err
}
clusterData := &util.ClusterInfo{
ClusterID: clusterID,
Monitors: strings.Split(monitors, ","),
}
clusterData.CephFS.SubvolumeGroup = subvolumeGroup
return clusterData, nil
}
// NewVolumeOptions generates a new instance of volumeOptions from the provided
// CSI request parameters.
func NewVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVolumeRequest,
cr *util.Credentials) (*VolumeOptions, error) {
var (
opts VolumeOptions
err error
)
volOptions := req.GetParameters()
clusterData, err := GetClusterInformation(volOptions)
if err != nil {
return nil, err
}
opts.ClusterID = clusterData.ClusterID
opts.Monitors = strings.Join(clusterData.Monitors, ",")
opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup
if err = extractOptionalOption(&opts.Pool, "pool", volOptions); err != nil {
return nil, err
}
if err = extractMounter(&opts.Mounter, volOptions); err != nil {
return nil, err
}
if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", volOptions); err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.FuseMountOptions, "fuseMountOptions", volOptions); err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.NamePrefix, "volumeNamePrefix", volOptions); err != nil {
return nil, err
}
opts.RequestName = requestName
err = opts.Connect(cr)
if err != nil {
return nil, err
}
opts.FscID, err = opts.getFscID(ctx)
if err != nil {
return nil, err
}
opts.MetadataPool, err = opts.getMetadataPool(ctx)
if err != nil {
return nil, err
}
// store topology information from the request
opts.TopologyPools, opts.TopologyRequirement, err = util.GetTopologyFromRequest(req)
if err != nil {
return nil, err
}
// TODO: we need an API to fetch subvolume attributes (size/datapool and others), based
// on which we can evaluate which topology this belongs to.
// CephFS tracker: https://tracker.ceph.com/issues/44277
if opts.TopologyPools != nil {
return nil, errors.New("topology based provisioning is not supported for CephFS backed volumes")
}
opts.ProvisionVolume = true
return &opts, nil
}
// newVolumeOptionsFromVolID generates a new instance of volumeOptions and VolumeIdentifier
// from the provided CSI VolumeID.
func NewVolumeOptionsFromVolID(
ctx context.Context,
volID string,
volOpt, secrets map[string]string) (*VolumeOptions, *VolumeIdentifier, error) {
var (
vi util.CSIIdentifier
volOptions VolumeOptions
vid VolumeIdentifier
)
// Decode the VolID first, to detect older volumes or pre-provisioned volumes
// before other errors
err := vi.DecomposeCSIID(volID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err)
return nil, nil, util.JoinErrors(cerrors.ErrInvalidVolID, err)
}
volOptions.ClusterID = vi.ClusterID
vid.VolumeID = volID
volOptions.FscID = vi.LocationID
if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil {
return nil, nil, fmt.Errorf("failed to fetch monitor list using clusterID (%s): %w", vi.ClusterID, err)
}
if volOptions.SubvolumeGroup, err = util.CephFSSubvolumeGroup(util.CsiConfigFile, vi.ClusterID); err != nil {
return nil, nil, fmt.Errorf("failed to fetch subvolumegroup list using clusterID (%s): %w", vi.ClusterID, err)
}
cr, err := util.NewAdminCredentials(secrets)
if err != nil {
return nil, nil, err
}
defer cr.DeleteCredentials()
err = volOptions.Connect(cr)
if err != nil {
return nil, nil, err
}
// in case of an error, volOptions is not returned, release any
// resources that may have been allocated
defer func() {
if err != nil {
volOptions.Destroy()
}
}()
volOptions.FsName, err = volOptions.getFsName(ctx)
if err != nil {
return nil, nil, err
}
volOptions.MetadataPool, err = volOptions.getMetadataPool(ctx)
if err != nil {
return nil, nil, err
}
// Connect to cephfs' default radosNamespace (csi)
j, err := VolJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, nil, err
}
defer j.Destroy()
imageAttributes, err := j.GetImageAttributes(
ctx, volOptions.MetadataPool, vi.ObjectUUID, false)
if err != nil {
return nil, nil, err
}
volOptions.RequestName = imageAttributes.RequestName
vid.FsSubvolName = imageAttributes.ImageName
if volOpt != nil {
if err = extractOptionalOption(&volOptions.Pool, "pool", volOpt); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&volOptions.KernelMountOptions, "kernelMountOptions", volOpt); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&volOptions.FuseMountOptions, "fuseMountOptions", volOpt); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&volOptions.SubvolumeGroup, "subvolumeGroup", volOpt); err != nil {
return nil, nil, err
}
if err = extractMounter(&volOptions.Mounter, volOpt); err != nil {
return nil, nil, err
}
}
volOptions.ProvisionVolume = true
info, err := volOptions.GetSubVolumeInfo(ctx, fsutil.VolumeID(vid.FsSubvolName))
if err == nil {
volOptions.RootPath = info.Path
volOptions.Features = info.Features
}
if errors.Is(err, cerrors.ErrInvalidCommand) {
volOptions.RootPath, err = volOptions.GetVolumeRootPathCeph(ctx, fsutil.VolumeID(vid.FsSubvolName))
}
return &volOptions, &vid, err
}
// NewVolumeOptionsFromMonitorList generates a new instance of VolumeOptions and
// VolumeIdentifier from the provided CSI volume context.
func NewVolumeOptionsFromMonitorList(
volID string,
options, secrets map[string]string) (*VolumeOptions, *VolumeIdentifier, error) {
var (
opts VolumeOptions
vid VolumeIdentifier
provisionVolumeBool string
err error
)
// Check if monitors is part of the options
if err = extractOption(&opts.Monitors, "monitors", options); err != nil {
return nil, nil, err
}
// check if there are mon values in secret and if so override option retrieved monitors from
// monitors in the secret
mon, err := util.GetMonValFromSecret(secrets)
if err == nil && len(mon) > 0 {
opts.Monitors = mon
}
if err = extractOption(&provisionVolumeBool, "provisionVolume", options); err != nil {
return nil, nil, err
}
if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil {
return nil, nil, fmt.Errorf("failed to parse provisionVolume: %w", err)
}
if opts.ProvisionVolume {
if err = extractOption(&opts.Pool, "pool", options); err != nil {
return nil, nil, err
}
opts.RootPath = GetVolumeRootPathCephDeprecated(fsutil.VolumeID(volID))
} else {
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
}
}
if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.FuseMountOptions, "fuseMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractMounter(&opts.Mounter, options); err != nil {
return nil, nil, err
}
vid.FsSubvolName = volID
vid.VolumeID = volID
return &opts, &vid, nil
}
// NewVolumeOptionsFromStaticVolume generates a new instance of volumeOptions and
// VolumeIdentifier from the provided CSI volume context, if the provided context is
// detected to be a statically provisioned volume.
func NewVolumeOptionsFromStaticVolume(
volID string,
options map[string]string) (*VolumeOptions, *VolumeIdentifier, error) {
var (
opts VolumeOptions
vid VolumeIdentifier
staticVol bool
err error
)
val, ok := options["staticVolume"]
if !ok {
return nil, nil, cerrors.ErrNonStaticVolume
}
if staticVol, err = strconv.ParseBool(val); err != nil {
return nil, nil, fmt.Errorf("failed to parse preProvisionedVolume: %w", err)
}
if !staticVol {
return nil, nil, cerrors.ErrNonStaticVolume
}
// Volume is static, and ProvisionVolume carries bool stating if it was provisioned, hence
// store NOT of static boolean
opts.ProvisionVolume = !staticVol
clusterData, err := GetClusterInformation(options)
if err != nil {
return nil, nil, err
}
opts.ClusterID = clusterData.ClusterID
opts.Monitors = strings.Join(clusterData.Monitors, ",")
opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
}
if err = extractOption(&opts.FsName, "fsName", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.FuseMountOptions, "fuseMountOptions", options); err != nil {
return nil, nil, err
}
if err = extractOptionalOption(&opts.SubvolumeGroup, "subvolumeGroup", options); err != nil {
return nil, nil, err
}
if err = extractMounter(&opts.Mounter, options); err != nil {
return nil, nil, err
}
vid.FsSubvolName = opts.RootPath
vid.VolumeID = volID
return &opts, &vid, nil
}
// NewSnapshotOptionsFromID generates a new instance of volumeOptions and SnapshotIdentifier
// from the provided CSI VolumeID.
func NewSnapshotOptionsFromID(
ctx context.Context,
snapID string,
cr *util.Credentials) (*VolumeOptions, *SnapshotInfo, *SnapshotIdentifier, error) {
var (
vi util.CSIIdentifier
volOptions VolumeOptions
sid SnapshotIdentifier
)
// Decode the snapID first, to detect pre-provisioned snapshot before other errors
err := vi.DecomposeCSIID(snapID)
if err != nil {
return &volOptions, nil, &sid, cerrors.ErrInvalidVolID
}
volOptions.ClusterID = vi.ClusterID
sid.SnapshotID = snapID
volOptions.FscID = vi.LocationID
if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil {
return &volOptions, nil, &sid, fmt.Errorf(
"failed to fetch monitor list using clusterID (%s): %w",
vi.ClusterID,
err)
}
if volOptions.SubvolumeGroup, err = util.CephFSSubvolumeGroup(util.CsiConfigFile, vi.ClusterID); err != nil {
return &volOptions, nil, &sid, fmt.Errorf(
"failed to fetch subvolumegroup list using clusterID (%s): %w",
vi.ClusterID,
err)
}
err = volOptions.Connect(cr)
if err != nil {
return &volOptions, nil, &sid, err
}
// in case of an error, volOptions is returned, but callers may not
// expect to need to call Destroy() on it. So, make sure to release any
// resources that may have been allocated
defer func() {
if err != nil {
volOptions.Destroy()
}
}()
volOptions.FsName, err = volOptions.getFsName(ctx)
if err != nil {
return &volOptions, nil, &sid, err
}
volOptions.MetadataPool, err = volOptions.getMetadataPool(ctx)
if err != nil {
return &volOptions, nil, &sid, err
}
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return &volOptions, nil, &sid, err
}
defer j.Destroy()
imageAttributes, err := j.GetImageAttributes(
ctx, volOptions.MetadataPool, vi.ObjectUUID, true)
if err != nil {
return &volOptions, nil, &sid, err
}
// storing request name in snapshot Identifier
sid.RequestName = imageAttributes.RequestName
sid.FsSnapshotName = imageAttributes.ImageName
sid.FsSubvolName = imageAttributes.SourceName
subvolInfo, err := volOptions.GetSubVolumeInfo(ctx, fsutil.VolumeID(sid.FsSubvolName))
if err != nil {
return &volOptions, nil, &sid, err
}
volOptions.Features = subvolInfo.Features
info, err := volOptions.GetSnapshotInfo(ctx, fsutil.VolumeID(sid.FsSnapshotName), fsutil.VolumeID(sid.FsSubvolName))
if err != nil {
return &volOptions, nil, &sid, err
}
return &volOptions, &info, &sid, nil
}
func GenSnapFromOptions(ctx context.Context, req *csi.CreateSnapshotRequest) (snap *CephfsSnapshot, err error) {
cephfsSnap := &CephfsSnapshot{}
cephfsSnap.RequestName = req.GetName()
snapOptions := req.GetParameters()
clusterID, err := util.GetClusterID(snapOptions)
if err != nil {
return nil, err
}
cephfsSnap.Monitors, cephfsSnap.ClusterID, err = util.GetMonsAndClusterID(ctx, clusterID, false)
if err != nil {
log.ErrorLog(ctx, "failed getting mons (%s)", err)
return nil, err
}
if namePrefix, ok := snapOptions["snapshotNamePrefix"]; ok {
cephfsSnap.NamePrefix = namePrefix
}
return cephfsSnap, nil
}