mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 02:33:34 +00:00
Merge pull request #390 from ShyamsundarR/stateless-cephfs
Make CephFS plugin stateless reusing RADOS based journal scheme
This commit is contained in:
117
pkg/cephfs/cephfs_util.go
Normal file
117
pkg/cephfs/cephfs_util.go
Normal file
@ -0,0 +1,117 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
)
|
||||
|
||||
// MDSMap is a representation of the mds map sub-structure returned by 'ceph fs get'
|
||||
type MDSMap struct {
|
||||
FilesystemName string `json:"fs_name"`
|
||||
}
|
||||
|
||||
// CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs get'
|
||||
type CephFilesystemDetails struct {
|
||||
ID int64 `json:"id"`
|
||||
MDSMap MDSMap `json:"mdsmap"`
|
||||
}
|
||||
|
||||
func getFscID(monitors, id, key, fsName string) (int64, error) {
|
||||
// ceph fs get myfs --format=json
|
||||
// {"mdsmap":{...},"id":2}
|
||||
var fsDetails CephFilesystemDetails
|
||||
err := execCommandJSON(&fsDetails,
|
||||
"ceph",
|
||||
"-m", monitors,
|
||||
"--id", id,
|
||||
"--key="+key,
|
||||
"-c", util.CephConfigPath,
|
||||
"fs", "get", fsName, "--format=json",
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return fsDetails.ID, nil
|
||||
}
|
||||
|
||||
// CephFilesystem is a representation of the json structure returned by 'ceph fs ls'
|
||||
type CephFilesystem struct {
|
||||
Name string `json:"name"`
|
||||
MetadataPool string `json:"metadata_pool"`
|
||||
MetadataPoolID int `json:"metadata_pool_id"`
|
||||
DataPools []string `json:"data_pools"`
|
||||
DataPoolIDs []int `json:"data_pool_ids"`
|
||||
}
|
||||
|
||||
func getMetadataPool(monitors, id, key, fsName string) (string, error) {
|
||||
// ./tbox ceph fs ls --format=json
|
||||
// [{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":4,...},...]
|
||||
var filesystems []CephFilesystem
|
||||
err := execCommandJSON(&filesystems,
|
||||
"ceph",
|
||||
"-m", monitors,
|
||||
"--id", id,
|
||||
"--key="+key,
|
||||
"-c", util.CephConfigPath,
|
||||
"fs", "ls", "--format=json",
|
||||
)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, fs := range filesystems {
|
||||
if fs.Name == fsName {
|
||||
return fs.MetadataPool, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("fsName (%s) not found in Ceph cluster", fsName)
|
||||
}
|
||||
|
||||
// CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs dump'
|
||||
type CephFilesystemDump struct {
|
||||
Filesystems []CephFilesystemDetails `json:"filesystems"`
|
||||
}
|
||||
|
||||
func getFsName(monitors, id, key string, fscID int64) (string, error) {
|
||||
// ./tbox ceph fs dump --format=json
|
||||
// JSON: {...,"filesystems":[{"mdsmap":{},"id":<n>},...],...}
|
||||
var fsDump CephFilesystemDump
|
||||
err := execCommandJSON(&fsDump,
|
||||
"ceph",
|
||||
"-m", monitors,
|
||||
"--id", id,
|
||||
"--key="+key,
|
||||
"-c", util.CephConfigPath,
|
||||
"fs", "dump", "--format=json",
|
||||
)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, fs := range fsDump.Filesystems {
|
||||
if fs.ID == fscID {
|
||||
return fs.MDSMap.FilesystemName, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("fscID (%d) not found in Ceph cluster", fscID)
|
||||
}
|
@ -100,6 +100,7 @@ func createCephUser(volOptions *volumeOptions, adminCr *credentials, volID volum
|
||||
func deleteCephUser(volOptions *volumeOptions, adminCr *credentials, volID volumeID) error {
|
||||
adminID, userID := genUserIDs(adminCr, volID)
|
||||
|
||||
// TODO: Need to return success if userID is not found
|
||||
return execCommandErr("ceph",
|
||||
"-m", volOptions.Monitors,
|
||||
"-n", adminID,
|
||||
|
@ -41,10 +41,39 @@ type controllerCacheEntry struct {
|
||||
}
|
||||
|
||||
var (
|
||||
mtxControllerVolumeID = keymutex.NewHashed(0)
|
||||
mtxControllerVolumeID = keymutex.NewHashed(0)
|
||||
mtxControllerVolumeName = keymutex.NewHashed(0)
|
||||
)
|
||||
|
||||
// CreateVolume creates the volume in backend and store the volume metadata
|
||||
// createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID,
|
||||
// and on any error cleans up any created entities
|
||||
func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
|
||||
cr, err := getAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err = createVolume(volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil {
|
||||
klog.Errorf("failed to create volume %s: %v", volOptions.RequestName, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if errDefer := purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil {
|
||||
klog.Warningf("failed purging volume: %s (%s)", volOptions.RequestName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err = createCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil {
|
||||
klog.Errorf("failed to create ceph user for volume %s: %v", volOptions.RequestName, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateVolume creates a reservation and the volume in backend, if it is not already present
|
||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||
if err := cs.validateCreateVolumeRequest(req); err != nil {
|
||||
klog.Errorf("CreateVolumeRequest validation failed: %v", err)
|
||||
@ -52,67 +81,69 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
}
|
||||
|
||||
// Configuration
|
||||
|
||||
secret := req.GetSecrets()
|
||||
volOptions, err := newVolumeOptions(req.GetParameters(), secret)
|
||||
requestName := req.GetName()
|
||||
volOptions, err := newVolumeOptions(requestName, req.GetCapacityRange().GetRequiredBytes(),
|
||||
req.GetParameters(), secret)
|
||||
if err != nil {
|
||||
klog.Errorf("validation of volume options failed: %v", err)
|
||||
klog.Errorf("validation and extraction of volume options failed: %v", err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
volID := makeVolumeID(req.GetName())
|
||||
// Existence and conflict checks
|
||||
mtxControllerVolumeName.LockKey(requestName)
|
||||
defer mustUnlock(mtxControllerVolumeName, requestName)
|
||||
|
||||
mtxControllerVolumeID.LockKey(string(volID))
|
||||
defer mustUnlock(mtxControllerVolumeID, string(volID))
|
||||
|
||||
// Create a volume in case the user didn't provide one
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
// Admin credentials are required
|
||||
cr, err := getAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err = createVolume(volOptions, cr, volID, req.GetCapacityRange().GetRequiredBytes()); err != nil {
|
||||
klog.Errorf("failed to create volume %s: %v", req.GetName(), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if _, err = createCephUser(volOptions, cr, volID); err != nil {
|
||||
klog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully created volume %s", volID)
|
||||
} else {
|
||||
klog.Infof("cephfs: volume %s is provisioned statically", volID)
|
||||
}
|
||||
|
||||
ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volID}
|
||||
if err := cs.MetadataStore.Create(string(volID), ce); err != nil {
|
||||
klog.Errorf("failed to store a cache entry for volume %s: %v", volID, err)
|
||||
vID, err := checkVolExists(volOptions, secret)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if vID != nil {
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
VolumeId: vID.VolumeID,
|
||||
CapacityBytes: volOptions.Size,
|
||||
VolumeContext: req.GetParameters(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Reservation
|
||||
vID, err = reserveVol(volOptions, secret)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := undoVolReservation(volOptions, *vID, secret)
|
||||
if errDefer != nil {
|
||||
klog.Warningf("failed undoing reservation of volume: %s (%s)",
|
||||
requestName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a volume
|
||||
err = cs.createBackingVolume(volOptions, vID, secret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully created backing volume named %s for request name %s",
|
||||
vID.FsSubvolName, requestName)
|
||||
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
VolumeId: string(volID),
|
||||
VolumeId: vID.VolumeID,
|
||||
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
|
||||
VolumeContext: req.GetParameters(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteVolume deletes the volume in backend
|
||||
// and removes the volume metadata from store
|
||||
// nolint: gocyclo
|
||||
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
if err := cs.validateDeleteVolumeRequest(); err != nil {
|
||||
klog.Errorf("DeleteVolumeRequest validation failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin,
|
||||
// that have state information stored in files or kubernetes config maps
|
||||
func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
var (
|
||||
volID = volumeID(req.GetVolumeId())
|
||||
secrets = req.GetSecrets()
|
||||
@ -172,6 +203,65 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// DeleteVolume deletes the volume in backend and its reservation
|
||||
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
if err := cs.validateDeleteVolumeRequest(); err != nil {
|
||||
klog.Errorf("DeleteVolumeRequest validation failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volID := volumeID(req.GetVolumeId())
|
||||
secrets := req.GetSecrets()
|
||||
|
||||
// Find the volume using the provided VolumeID
|
||||
volOptions, vID, err := newVolumeOptionsFromVolID(string(volID), nil, secrets)
|
||||
if err != nil {
|
||||
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
||||
// or partially complete (subvolume and imageOMap are garbage collected already), hence
|
||||
// return success as deletion is complete
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// ErrInvalidVolID may mean this is an 1.0.0 version volume
|
||||
if _, ok := err.(ErrInvalidVolID); ok && cs.MetadataStore != nil {
|
||||
return cs.deleteVolumeDeprecated(req)
|
||||
}
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// Deleting a volume requires admin credentials
|
||||
cr, err := getAdminCredentials(secrets)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to retrieve admin credentials: %v", err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// lock out parallel delete and create requests against the same volume name as we
|
||||
// cleanup the subvolume and associated omaps for the same
|
||||
mtxControllerVolumeName.LockKey(volOptions.RequestName)
|
||||
defer mustUnlock(mtxControllerVolumeName, volOptions.RequestName)
|
||||
|
||||
if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
||||
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = deleteCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil {
|
||||
klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err := undoVolReservation(volOptions, *vID, secrets); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully deleted volume %s", volID)
|
||||
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// ValidateVolumeCapabilities checks whether the volume capabilities requested
|
||||
// are supported.
|
||||
func (cs *ControllerServer) ValidateVolumeCapabilities(
|
||||
|
@ -26,9 +26,17 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
// version of ceph driver
|
||||
version = "1.0.0"
|
||||
|
||||
// volIDVersion is the version number of volume ID encoding scheme
|
||||
volIDVersion uint16 = 1
|
||||
|
||||
// csiConfigFile is the location of the CSI config file
|
||||
csiConfigFile = "/etc/ceph-csi-config/config.json"
|
||||
|
||||
// RADOS namespace to store CSI specific objects and keys
|
||||
radosNamespace = "csi"
|
||||
)
|
||||
|
||||
// PluginFolder defines the location of ceph plugin
|
||||
@ -46,6 +54,14 @@ type Driver struct {
|
||||
var (
|
||||
// DefaultVolumeMounter for mounting volumes
|
||||
DefaultVolumeMounter string
|
||||
|
||||
// CSIInstanceID is the instance ID that is unique to an instance of CSI, used when sharing
|
||||
// ceph clusters across CSI instances, to differentiate omap names per CSI instance
|
||||
CSIInstanceID = "default"
|
||||
|
||||
// volJournal is used to maintain RADOS based journals for CO generated
|
||||
// VolumeName to backing CephFS subvolumes
|
||||
volJournal *util.CSIJournal
|
||||
)
|
||||
|
||||
// NewDriver returns new ceph driver
|
||||
@ -77,7 +93,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {
|
||||
|
||||
// Run start a non-blocking grpc controller,node and identityserver for
|
||||
// ceph CSI driver which can serve multiple parallel requests
|
||||
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) {
|
||||
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir, instanceID string, cachePersister util.CachePersister) {
|
||||
klog.Infof("Driver: %v version: %v", driverName, version)
|
||||
|
||||
// Configuration
|
||||
@ -105,7 +121,21 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir
|
||||
klog.Fatalf("failed to write ceph configuration file: %v", err)
|
||||
}
|
||||
|
||||
initVolumeMountCache(driverName, mountCacheDir, cachePersister)
|
||||
// Use passed in instance ID, if provided for omap suffix naming
|
||||
if instanceID != "" {
|
||||
CSIInstanceID = instanceID
|
||||
}
|
||||
// Get an instance of the volume journal
|
||||
volJournal = util.NewCSIVolumeJournal()
|
||||
|
||||
// Update keys with CSI instance suffix
|
||||
volJournal.SetCSIDirectorySuffix(CSIInstanceID)
|
||||
|
||||
// Update namespace for storing keys into a specific namespace on RADOS, in the CephFS
|
||||
// metadata pool
|
||||
volJournal.SetNamespace(radosNamespace)
|
||||
|
||||
initVolumeMountCache(driverName, mountCacheDir)
|
||||
if mountCacheDir != "" {
|
||||
if err := remountCachedVolumes(); err != nil {
|
||||
klog.Warningf("failed to remount cached volumes: %v", err)
|
||||
|
37
pkg/cephfs/errors.go
Normal file
37
pkg/cephfs/errors.go
Normal file
@ -0,0 +1,37 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
// ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID
|
||||
// formats
|
||||
type ErrInvalidVolID struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e ErrInvalidVolID) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
// ErrNonStaticVolume is returned when a volume is detected as not being
|
||||
// statically provisioned
|
||||
type ErrNonStaticVolume struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e ErrNonStaticVolume) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
136
pkg/cephfs/fsjournal.go
Normal file
136
pkg/cephfs/fsjournal.go
Normal file
@ -0,0 +1,136 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
// volumeIdentifier structure contains an association between the CSI VolumeID to its subvolume
|
||||
// name on the backing CephFS instance
|
||||
type volumeIdentifier struct {
|
||||
FsSubvolName string
|
||||
VolumeID string
|
||||
}
|
||||
|
||||
/*
|
||||
checkVolExists checks to determine if passed in RequestName in volOptions exists on the backend.
|
||||
|
||||
**NOTE:** These functions manipulate the rados omaps that hold information regarding
|
||||
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
|
||||
respective CSI driver generated volume name based locks are held, as otherwise racy
|
||||
access to these omaps may end up leaving them in an inconsistent state.
|
||||
|
||||
These functions also cleanup omap reservations that are stale. I.e when omap entries exist and
|
||||
backing subvolumes are missing, or one of the omaps exist and the next is missing. This is
|
||||
because, the order of omap creation and deletion are inverse of each other, and protected by the
|
||||
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
|
||||
hence safe to garbage collect.
|
||||
*/
|
||||
func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
|
||||
var (
|
||||
vi util.CSIIdentifier
|
||||
vid volumeIdentifier
|
||||
)
|
||||
|
||||
cr, err := getAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
imageUUID, err := volJournal.CheckReservation(volOptions.Monitors, cr.id, cr.key,
|
||||
volOptions.MetadataPool, volOptions.RequestName, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if imageUUID == "" {
|
||||
return nil, nil
|
||||
}
|
||||
vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID
|
||||
|
||||
// TODO: size checks
|
||||
|
||||
// found a volume already available, process and return it!
|
||||
vi = util.CSIIdentifier{
|
||||
LocationID: volOptions.FscID,
|
||||
EncodingVersion: volIDVersion,
|
||||
ClusterID: volOptions.ClusterID,
|
||||
ObjectUUID: imageUUID,
|
||||
}
|
||||
vid.VolumeID, err = vi.ComposeCSIID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Found existing volume (%s) with subvolume name (%s) for request (%s)",
|
||||
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
|
||||
|
||||
return &vid, nil
|
||||
}
|
||||
|
||||
// undoVolReservation is a helper routine to undo a name reservation for a CSI VolumeName
|
||||
func undoVolReservation(volOptions *volumeOptions, vid volumeIdentifier, secret map[string]string) error {
|
||||
cr, err := getAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = volJournal.UndoReservation(volOptions.Monitors, cr.id, cr.key, volOptions.MetadataPool,
|
||||
vid.FsSubvolName, volOptions.RequestName)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// reserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and,
|
||||
// to generate the volume identifier for the reserved UUID
|
||||
func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
|
||||
var (
|
||||
vi util.CSIIdentifier
|
||||
vid volumeIdentifier
|
||||
)
|
||||
|
||||
cr, err := getAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
imageUUID, err := volJournal.ReserveName(volOptions.Monitors, cr.id, cr.key,
|
||||
volOptions.MetadataPool, volOptions.RequestName, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID
|
||||
|
||||
// generate the volume ID to return to the CO system
|
||||
vi = util.CSIIdentifier{
|
||||
LocationID: volOptions.FscID,
|
||||
EncodingVersion: volIDVersion,
|
||||
ClusterID: volOptions.ClusterID,
|
||||
ObjectUUID: imageUUID,
|
||||
}
|
||||
vid.VolumeID, err = vi.ComposeCSIID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Generated Volume ID (%s) and subvolume name (%s) for request name (%s)",
|
||||
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
|
||||
|
||||
return &vid, nil
|
||||
}
|
@ -16,6 +16,7 @@ type volumeMountCacheEntry struct {
|
||||
DriverVersion string `json:"driverVersion"`
|
||||
|
||||
VolumeID string `json:"volumeID"`
|
||||
Mounter string `json:"mounter"`
|
||||
Secrets map[string]string `json:"secrets"`
|
||||
StagingPath string `json:"stagingPath"`
|
||||
TargetPaths map[string]bool `json:"targetPaths"`
|
||||
@ -25,7 +26,6 @@ type volumeMountCacheEntry struct {
|
||||
type volumeMountCacheMap struct {
|
||||
volumes map[string]volumeMountCacheEntry
|
||||
nodeCacheStore util.NodeCache
|
||||
metadataStore util.CachePersister
|
||||
}
|
||||
|
||||
var (
|
||||
@ -34,10 +34,9 @@ var (
|
||||
volumeMountCacheMtx sync.Mutex
|
||||
)
|
||||
|
||||
func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) {
|
||||
func initVolumeMountCache(driverName string, mountCacheDir string) {
|
||||
volumeMountCache.volumes = make(map[string]volumeMountCacheEntry)
|
||||
|
||||
volumeMountCache.metadataStore = cachePersister
|
||||
volumeMountCache.nodeCacheStore.BasePath = mountCacheDir
|
||||
volumeMountCache.nodeCacheStore.CacheDir = driverName
|
||||
klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir)
|
||||
@ -50,18 +49,19 @@ func remountCachedVolumes() error {
|
||||
}
|
||||
var remountFailCount, remountSuccCount int64
|
||||
me := &volumeMountCacheEntry{}
|
||||
ce := &controllerCacheEntry{}
|
||||
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
|
||||
volID := me.VolumeID
|
||||
if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.Infof("mount-cache: metadata not found, assuming the volume %s to be already deleted (%v)", volID, err)
|
||||
if volOpts, vid, err := newVolumeOptionsFromVolID(me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil {
|
||||
if err, ok := err.(util.ErrKeyNotFound); ok {
|
||||
klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err)
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := mountOneCacheEntry(ce, me); err == nil {
|
||||
// update Mounter from mount cache
|
||||
volOpts.Mounter = me.Mounter
|
||||
if err := mountOneCacheEntry(volOpts, vid, me); err == nil {
|
||||
remountSuccCount++
|
||||
volumeMountCache.volumes[me.VolumeID] = *me
|
||||
klog.Infof("mount-cache: successfully remounted volume %s", volID)
|
||||
@ -84,7 +84,7 @@ func remountCachedVolumes() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error {
|
||||
func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *volumeMountCacheEntry) error {
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
@ -92,17 +92,16 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err
|
||||
err error
|
||||
cr *credentials
|
||||
)
|
||||
volID := ce.VolumeID
|
||||
volOptions := ce.VolOptions
|
||||
volID := vid.VolumeID
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
||||
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
|
||||
cr, err = getAdminCredentials(decodeCredentials(me.Secrets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var entity *cephEntity
|
||||
entity, err = getCephUser(&volOptions, cr, volID)
|
||||
entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -127,12 +126,12 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err
|
||||
}
|
||||
|
||||
if !isMnt {
|
||||
m, err := newMounter(&volOptions)
|
||||
m, err := newMounter(volOptions)
|
||||
if err != nil {
|
||||
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
if err := m.mount(me.StagingPath, cr, &volOptions); err != nil {
|
||||
if err := m.mount(me.StagingPath, cr, volOptions); err != nil {
|
||||
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
@ -204,7 +203,7 @@ func (mc *volumeMountCacheMap) isEnable() bool {
|
||||
return mc.nodeCacheStore.BasePath != ""
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error {
|
||||
func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter string, secrets map[string]string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
@ -228,6 +227,7 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s
|
||||
me.Secrets = encodeCredentials(secrets)
|
||||
me.StagingPath = stagingTargetPath
|
||||
me.TargetPaths = lastTargetPaths
|
||||
me.Mounter = mounter
|
||||
|
||||
me.CreateTime = time.Now()
|
||||
volumeMountCache.volumes[volID] = me
|
||||
|
@ -80,6 +80,10 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
|
||||
|
||||
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
||||
var (
|
||||
volOptions *volumeOptions
|
||||
vid *volumeIdentifier
|
||||
)
|
||||
if err := validateNodeStageVolumeRequest(req); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
@ -89,15 +93,26 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
volID := volumeID(req.GetVolumeId())
|
||||
|
||||
volOptions, err := newVolumeOptions(req.GetVolumeContext(), req.GetSecrets())
|
||||
volOptions, vid, err := newVolumeOptionsFromVolID(string(volID), req.GetVolumeContext(), req.GetSecrets())
|
||||
if err != nil {
|
||||
klog.Errorf("error reading volume options for volume %s: %v", volID, err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
if _, ok := err.(ErrInvalidVolID); !ok {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
// Dynamically provisioned volumes don't have their root path set, do it here
|
||||
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
||||
// check for pre-provisioned volumes (plugin versions > 1.0.0)
|
||||
volOptions, vid, err = newVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext())
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrNonStaticVolume); !ok {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// check for volumes from plugin versions <= 1.0.0
|
||||
volOptions, vid, err = newVolumeOptionsFromVersion1Context(string(volID), req.GetVolumeContext(),
|
||||
req.GetSecrets())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = createMountPoint(stagingTargetPath); err != nil {
|
||||
@ -123,7 +138,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
}
|
||||
|
||||
// It's not, mount now
|
||||
if err = ns.mount(volOptions, req); err != nil {
|
||||
if err = ns.mount(volOptions, req, vid); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -132,11 +147,11 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error {
|
||||
func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest, vid *volumeIdentifier) error {
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
volID := volumeID(req.GetVolumeId())
|
||||
|
||||
cr, err := getCredentialsForVolume(volOptions, volID, req)
|
||||
cr, err := getCredentialsForVolume(volOptions, volumeID(vid.FsSubvolName), req)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
@ -154,7 +169,7 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ
|
||||
klog.Errorf("failed to mount volume %s: %v", volID, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil {
|
||||
if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil {
|
||||
klog.Warningf("mount-cache: failed to stage volume %s %s: %v", volID, stagingTargetPath, err)
|
||||
}
|
||||
return nil
|
||||
|
@ -42,10 +42,6 @@ func mustUnlock(m keymutex.KeyMutex, key string) {
|
||||
}
|
||||
}
|
||||
|
||||
func makeVolumeID(volName string) volumeID {
|
||||
return volumeID("csi-cephfs-" + volName)
|
||||
}
|
||||
|
||||
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
|
||||
var (
|
||||
cmd = exec.Command(program, args...) // nolint: gosec
|
||||
@ -72,6 +68,7 @@ func execCommandErr(program string, args ...string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
//nolint: unparam
|
||||
func execCommandJSON(v interface{}, program string, args ...string) error {
|
||||
stdout, _, err := execCommand(program, args...)
|
||||
if err != nil {
|
||||
|
@ -114,7 +114,7 @@ func newMounter(volOptions *volumeOptions) (volumeMounter, error) {
|
||||
type fuseMounter struct{}
|
||||
|
||||
func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions) error {
|
||||
args := [...]string{
|
||||
args := []string{
|
||||
mountPoint,
|
||||
"-m", volOptions.Monitors,
|
||||
"-c", util.CephConfigPath,
|
||||
@ -123,6 +123,10 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions) er
|
||||
"-o", "nonempty",
|
||||
}
|
||||
|
||||
if volOptions.FsName != "" {
|
||||
args = append(args, "--client_mds_namespace="+volOptions.FsName)
|
||||
}
|
||||
|
||||
_, stderr, err := execCommand("ceph-fuse", args[:]...)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -166,12 +170,18 @@ func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions)
|
||||
return err
|
||||
}
|
||||
|
||||
return execCommandErr("mount",
|
||||
args := []string{
|
||||
"-t", "ceph",
|
||||
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
|
||||
mountPoint,
|
||||
"-o", fmt.Sprintf("name=%s,secret=%s", cr.id, cr.key),
|
||||
)
|
||||
}
|
||||
optionsStr := fmt.Sprintf("name=%s,secret=%s", cr.id, cr.key)
|
||||
if volOptions.FsName != "" {
|
||||
optionsStr = optionsStr + fmt.Sprintf(",mds_namespace=%s", volOptions.FsName)
|
||||
}
|
||||
args = append(args, "-o", optionsStr)
|
||||
|
||||
return execCommandErr("mount", args[:]...)
|
||||
}
|
||||
|
||||
func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions) error {
|
||||
|
@ -19,17 +19,22 @@ package cephfs
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
)
|
||||
|
||||
type volumeOptions struct {
|
||||
Monitors string `json:"monitors"`
|
||||
Pool string `json:"pool"`
|
||||
RootPath string `json:"rootPath"`
|
||||
|
||||
RequestName string
|
||||
Size int64
|
||||
ClusterID string
|
||||
FsName string
|
||||
FscID int64
|
||||
MetadataPool string
|
||||
Monitors string `json:"monitors"`
|
||||
Pool string `json:"pool"`
|
||||
RootPath string `json:"rootPath"`
|
||||
Mounter string `json:"mounter"`
|
||||
ProvisionVolume bool `json:"provisionVolume"`
|
||||
|
||||
MonValueFromSecret string `json:"monValueFromSecret"`
|
||||
}
|
||||
|
||||
func validateNonEmptyField(field, fieldName string) error {
|
||||
@ -40,35 +45,18 @@ func validateNonEmptyField(field, fieldName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *volumeOptions) validate() error {
|
||||
if err := validateNonEmptyField(o.Monitors, "monitors"); err != nil {
|
||||
if err = validateNonEmptyField(o.MonValueFromSecret, "monValueFromSecret"); err != nil {
|
||||
return err
|
||||
}
|
||||
func extractOptionalOption(dest *string, optionLabel string, options map[string]string) error {
|
||||
opt, ok := options[optionLabel]
|
||||
if !ok {
|
||||
// Option not found, no error as it is optional
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(o.RootPath, "rootPath"); err != nil {
|
||||
if !o.ProvisionVolume {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if o.ProvisionVolume {
|
||||
return fmt.Errorf("non-empty field rootPath is in conflict with provisionVolume=true")
|
||||
}
|
||||
}
|
||||
|
||||
if o.ProvisionVolume {
|
||||
if err := validateNonEmptyField(o.Pool, "pool"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if o.Mounter != "" {
|
||||
if err := validateMounter(o.Mounter); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := validateNonEmptyField(opt, optionLabel); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*dest = opt
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -78,6 +66,10 @@ func extractOption(dest *string, optionLabel string, options map[string]string)
|
||||
return fmt.Errorf("missing required field %s", optionLabel)
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(opt, optionLabel); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*dest = opt
|
||||
return nil
|
||||
}
|
||||
@ -93,63 +85,252 @@ func validateMounter(m string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newVolumeOptions(volOptions, secret map[string]string) (*volumeOptions, error) {
|
||||
func extractMounter(dest *string, options map[string]string) error {
|
||||
if err := extractOptionalOption(dest, "mounter", options); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if *dest != "" {
|
||||
if err := validateMounter(*dest); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMonsAndClusterID(options map[string]string) (string, string, error) {
|
||||
clusterID, ok := options["clusterID"]
|
||||
if !ok {
|
||||
err := fmt.Errorf("clusterID must be set")
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(clusterID, "clusterID"); err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
monitors, err := util.Mons(csiConfigFile, clusterID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", clusterID)
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
return monitors, clusterID, err
|
||||
}
|
||||
|
||||
// newVolumeOptions generates a new instance of volumeOptions from the provided
|
||||
// CSI request parameters
|
||||
func newVolumeOptions(requestName string, size int64, volOptions, secret map[string]string) (*volumeOptions, error) {
|
||||
var (
|
||||
opts volumeOptions
|
||||
err error
|
||||
)
|
||||
|
||||
// extract mon from secret first
|
||||
if err = extractOption(&opts.MonValueFromSecret, "monValueFromSecret", volOptions); err == nil {
|
||||
mon := ""
|
||||
if mon, err = getMonValFromSecret(secret); err == nil && len(mon) > 0 {
|
||||
opts.Monitors = mon
|
||||
}
|
||||
}
|
||||
if len(opts.Monitors) == 0 {
|
||||
// if not set in secret, get it from parameter
|
||||
if err = extractOption(&opts.Monitors, "monitors", volOptions); err != nil {
|
||||
return nil, fmt.Errorf("either monitors or monValueFromSecret should be set")
|
||||
}
|
||||
}
|
||||
|
||||
if err = extractNewVolOpt(&opts, volOptions); err != nil {
|
||||
opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(volOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = opts.validate(); err != nil {
|
||||
if err = extractOption(&opts.Pool, "pool", volOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = extractMounter(&opts.Mounter, volOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts.RequestName = requestName
|
||||
opts.Size = size
|
||||
|
||||
cr, err := getAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts.FscID, err = getFscID(opts.Monitors, cr.id, cr.key, opts.FsName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts.MetadataPool, err = getMetadataPool(opts.Monitors, cr.id, cr.key, opts.FsName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts.ProvisionVolume = true
|
||||
|
||||
return &opts, nil
|
||||
}
|
||||
|
||||
func extractNewVolOpt(opts *volumeOptions, volOpt map[string]string) error {
|
||||
// newVolumeOptionsFromVolID generates a new instance of volumeOptions and volumeIdentifier
|
||||
// from the provided CSI VolumeID
|
||||
func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) {
|
||||
var (
|
||||
vi util.CSIIdentifier
|
||||
volOptions volumeOptions
|
||||
vid volumeIdentifier
|
||||
)
|
||||
|
||||
// Decode the VolID first, to detect older volumes or pre-provisioned volumes
|
||||
// before other errors
|
||||
err := vi.DecomposeCSIID(volID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID)
|
||||
return nil, nil, ErrInvalidVolID{err}
|
||||
}
|
||||
volOptions.ClusterID = vi.ClusterID
|
||||
vid.FsSubvolName = volJournal.NamingPrefix() + vi.ObjectUUID
|
||||
vid.VolumeID = volID
|
||||
volOptions.FscID = vi.LocationID
|
||||
|
||||
if volOptions.Monitors, err = util.Mons(csiConfigFile, vi.ClusterID); err != nil {
|
||||
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", vi.ClusterID)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
cr, err := getAdminCredentials(secrets)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
volOptions.FsName, err = getFsName(volOptions.Monitors, cr.id, cr.key, volOptions.FscID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
volOptions.MetadataPool, err = getMetadataPool(volOptions.Monitors, cr.id, cr.key,
|
||||
volOptions.FsName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
volOptions.RequestName, _, err = volJournal.GetObjectUUIDData(volOptions.Monitors, cr.id, cr.key,
|
||||
volOptions.MetadataPool, vi.ObjectUUID, false)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if volOpt != nil {
|
||||
if err = extractOption(&volOptions.Pool, "pool", volOpt); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err = extractMounter(&volOptions.Mounter, volOpt); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
|
||||
volOptions.ProvisionVolume = true
|
||||
|
||||
return &volOptions, &vid, nil
|
||||
}
|
||||
|
||||
// newVolumeOptionsFromVersion1Context generates a new instance of volumeOptions and
|
||||
// volumeIdentifier from the provided CSI volume context, if the provided context was
|
||||
// for a volume created by version 1.0.0 (or prior) of the CSI plugin
|
||||
func newVolumeOptionsFromVersion1Context(volID string, options, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) {
|
||||
var (
|
||||
opts volumeOptions
|
||||
vid volumeIdentifier
|
||||
provisionVolumeBool string
|
||||
err error
|
||||
)
|
||||
if err = extractOption(&provisionVolumeBool, "provisionVolume", volOpt); err != nil {
|
||||
return err
|
||||
|
||||
// Check if monitors is part of the options, that is an indicator this is an 1.0.0 volume
|
||||
if err = extractOption(&opts.Monitors, "monitors", options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// check if there are mon values in secret and if so override option retrieved monitors from
|
||||
// monitors in the secret
|
||||
mon, err := getMonValFromSecret(secrets)
|
||||
if err == nil && len(mon) > 0 {
|
||||
opts.Monitors = mon
|
||||
}
|
||||
|
||||
if err = extractOption(&provisionVolumeBool, "provisionVolume", options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil {
|
||||
return fmt.Errorf("failed to parse provisionVolume: %v", err)
|
||||
return nil, nil, fmt.Errorf("failed to parse provisionVolume: %v", err)
|
||||
}
|
||||
|
||||
if opts.ProvisionVolume {
|
||||
if err = extractOption(&opts.Pool, "pool", volOpt); err != nil {
|
||||
return err
|
||||
if err = extractOption(&opts.Pool, "pool", options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
opts.RootPath = getVolumeRootPathCeph(volumeID(volID))
|
||||
} else {
|
||||
if err = extractOption(&opts.RootPath, "rootPath", volOpt); err != nil {
|
||||
return err
|
||||
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// This field is optional, don't check for its presence
|
||||
// nolint
|
||||
// (skip errcheck and gosec as this is optional)
|
||||
extractOption(&opts.Mounter, "mounter", volOpt)
|
||||
return nil
|
||||
if err = extractMounter(&opts.Mounter, options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
vid.FsSubvolName = volID
|
||||
vid.VolumeID = volID
|
||||
|
||||
return &opts, &vid, nil
|
||||
}
|
||||
|
||||
// newVolumeOptionsFromStaticVolume generates a new instance of volumeOptions and
|
||||
// volumeIdentifier from the provided CSI volume context, if the provided context is
|
||||
// detected to be a statically provisioned volume
|
||||
func newVolumeOptionsFromStaticVolume(volID string, options map[string]string) (*volumeOptions, *volumeIdentifier, error) {
|
||||
var (
|
||||
opts volumeOptions
|
||||
vid volumeIdentifier
|
||||
staticVol bool
|
||||
err error
|
||||
)
|
||||
|
||||
val, ok := options["staticVolume"]
|
||||
if !ok {
|
||||
return nil, nil, ErrNonStaticVolume{err}
|
||||
}
|
||||
|
||||
if staticVol, err = strconv.ParseBool(val); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to parse preProvisionedVolume: %v", err)
|
||||
}
|
||||
|
||||
if !staticVol {
|
||||
return nil, nil, ErrNonStaticVolume{err}
|
||||
}
|
||||
|
||||
// Volume is static, and ProvisionVolume carries bool stating if it was provisioned, hence
|
||||
// store NOT of static boolean
|
||||
opts.ProvisionVolume = !staticVol
|
||||
|
||||
opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(options)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err = extractOption(&opts.FsName, "fsName", options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err = extractMounter(&opts.Mounter, options); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
vid.FsSubvolName = opts.RootPath
|
||||
vid.VolumeID = volID
|
||||
|
||||
return &opts, &vid, nil
|
||||
}
|
||||
|
@ -306,7 +306,7 @@ func genSnapFromSnapID(rbdSnap *rbdSnapshot, snapshotID string, credentials map[
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, rbdSnap.AdminID, key, vi.PoolID)
|
||||
rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, rbdSnap.AdminID, key, vi.LocationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -359,7 +359,7 @@ func genVolFromVolID(rbdVol *rbdVolume, volumeID string, credentials map[string]
|
||||
}
|
||||
|
||||
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, rbdVol.AdminID, key,
|
||||
vi.PoolID)
|
||||
vi.LocationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -123,17 +123,22 @@ func GetPoolName(monitors string, adminID string, key string, poolID int64) (str
|
||||
}
|
||||
|
||||
// SetOMapKeyValue sets the given key and value into the provided Ceph omap name
|
||||
func SetOMapKeyValue(monitors, adminID, key, poolName, oMapName, oMapKey, keyValue string) error {
|
||||
func SetOMapKeyValue(monitors, adminID, key, poolName, namespace, oMapName, oMapKey, keyValue string) error {
|
||||
// Command: "rados <options> setomapval oMapName oMapKey keyValue"
|
||||
|
||||
_, _, err := ExecCommand(
|
||||
"rados",
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
"--id", adminID,
|
||||
"--key="+key,
|
||||
"--key=" + key,
|
||||
"-c", CephConfigPath,
|
||||
"-p", poolName,
|
||||
"setomapval", oMapName, oMapKey, keyValue)
|
||||
"setomapval", oMapName, oMapKey, keyValue,
|
||||
}
|
||||
|
||||
if namespace != "" {
|
||||
args = append(args, "--namespace="+namespace)
|
||||
}
|
||||
|
||||
_, _, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed adding key (%s with value %s), to omap (%s) in "+
|
||||
"pool (%s): (%v)", oMapKey, keyValue, oMapName, poolName, err)
|
||||
@ -144,7 +149,7 @@ func SetOMapKeyValue(monitors, adminID, key, poolName, oMapName, oMapKey, keyVal
|
||||
}
|
||||
|
||||
// GetOMapValue gets the value for the given key from the named omap
|
||||
func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (string, error) {
|
||||
func GetOMapValue(monitors, adminID, key, poolName, namespace, oMapName, oMapKey string) (string, error) {
|
||||
// Command: "rados <options> getomapval oMapName oMapKey <outfile>"
|
||||
// No such key: replicapool/csi.volumes.directory.default/csi.volname
|
||||
tmpFile, err := ioutil.TempFile("", "omap-get-")
|
||||
@ -155,14 +160,20 @@ func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (s
|
||||
defer tmpFile.Close()
|
||||
defer os.Remove(tmpFile.Name())
|
||||
|
||||
stdout, stderr, err := ExecCommand(
|
||||
"rados",
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
"--id", adminID,
|
||||
"--key="+key,
|
||||
"--key=" + key,
|
||||
"-c", CephConfigPath,
|
||||
"-p", poolName,
|
||||
"getomapval", oMapName, oMapKey, tmpFile.Name())
|
||||
"getomapval", oMapName, oMapKey, tmpFile.Name(),
|
||||
}
|
||||
|
||||
if namespace != "" {
|
||||
args = append(args, "--namespace="+namespace)
|
||||
}
|
||||
|
||||
stdout, stderr, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
// no logs, as attempting to check for non-existent key/value is done even on
|
||||
// regular call sequences
|
||||
@ -189,17 +200,22 @@ func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (s
|
||||
}
|
||||
|
||||
// RemoveOMapKey removes the omap key from the given omap name
|
||||
func RemoveOMapKey(monitors, adminID, key, poolName, oMapName, oMapKey string) error {
|
||||
func RemoveOMapKey(monitors, adminID, key, poolName, namespace, oMapName, oMapKey string) error {
|
||||
// Command: "rados <options> rmomapkey oMapName oMapKey"
|
||||
|
||||
_, _, err := ExecCommand(
|
||||
"rados",
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
"--id", adminID,
|
||||
"--key="+key,
|
||||
"--key=" + key,
|
||||
"-c", CephConfigPath,
|
||||
"-p", poolName,
|
||||
"rmomapkey", oMapName, oMapKey)
|
||||
"rmomapkey", oMapName, oMapKey,
|
||||
}
|
||||
|
||||
if namespace != "" {
|
||||
args = append(args, "--namespace="+namespace)
|
||||
}
|
||||
|
||||
_, _, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
// NOTE: Missing omap key removal does not return an error
|
||||
klog.Errorf("failed removing key (%s), from omap (%s) in "+
|
||||
@ -212,17 +228,22 @@ func RemoveOMapKey(monitors, adminID, key, poolName, oMapName, oMapKey string) e
|
||||
|
||||
// CreateObject creates the object name passed in and returns ErrObjectExists if the provided object
|
||||
// is already present in rados
|
||||
func CreateObject(monitors, adminID, key, poolName, objectName string) error {
|
||||
func CreateObject(monitors, adminID, key, poolName, namespace, objectName string) error {
|
||||
// Command: "rados <options> create objectName"
|
||||
|
||||
stdout, _, err := ExecCommand(
|
||||
"rados",
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
"--id", adminID,
|
||||
"--key="+key,
|
||||
"--key=" + key,
|
||||
"-c", CephConfigPath,
|
||||
"-p", poolName,
|
||||
"create", objectName)
|
||||
"create", objectName,
|
||||
}
|
||||
|
||||
if namespace != "" {
|
||||
args = append(args, "--namespace="+namespace)
|
||||
}
|
||||
|
||||
stdout, _, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed creating omap (%s) in pool (%s): (%v)", objectName, poolName, err)
|
||||
if strings.Contains(string(stdout), "error creating "+poolName+"/"+objectName+
|
||||
@ -237,17 +258,22 @@ func CreateObject(monitors, adminID, key, poolName, objectName string) error {
|
||||
|
||||
// RemoveObject removes the entire omap name passed in and returns ErrObjectNotFound is provided omap
|
||||
// is not found in rados
|
||||
func RemoveObject(monitors, adminID, key, poolName, oMapName string) error {
|
||||
func RemoveObject(monitors, adminID, key, poolName, namespace, oMapName string) error {
|
||||
// Command: "rados <options> rm oMapName"
|
||||
|
||||
stdout, _, err := ExecCommand(
|
||||
"rados",
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
"--id", adminID,
|
||||
"--key="+key,
|
||||
"--key=" + key,
|
||||
"-c", CephConfigPath,
|
||||
"-p", poolName,
|
||||
"rm", oMapName)
|
||||
"rm", oMapName,
|
||||
}
|
||||
|
||||
if namespace != "" {
|
||||
args = append(args, "--namespace="+namespace)
|
||||
}
|
||||
|
||||
stdout, _, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed removing omap (%s) in pool (%s): (%v)", oMapName, poolName, err)
|
||||
if strings.Contains(string(stdout), "error removing "+poolName+">"+oMapName+
|
||||
|
@ -102,7 +102,7 @@ func GenerateVolID(monitors, id, key, pool, clusterID, objUUID string, volIDVers
|
||||
|
||||
// generate the volume ID to return to the CO system
|
||||
vi := CSIIdentifier{
|
||||
PoolID: poolID,
|
||||
LocationID: poolID,
|
||||
EncodingVersion: volIDVersion,
|
||||
ClusterID: clusterID,
|
||||
ObjectUUID: objUUID,
|
||||
|
@ -32,8 +32,8 @@ The CSI identifier is composed as elaborated in the comment against ComposeCSIID
|
||||
DecomposeCSIID is the inverse of the same function.
|
||||
|
||||
The CSIIdentifier structure carries the following fields,
|
||||
- PoolID: 64 bit integer of the pool that the volume belongs to, where the ID comes from Ceph pool
|
||||
identifier for the corresponding pool name.
|
||||
- LocationID: 64 bit integer identifier determining the location of the volume on the Ceph cluster.
|
||||
It is the ID of the poolname or fsname, for RBD or CephFS backed volumes respectively.
|
||||
- EncodingVersion: Carries the version number of the encoding scheme used to encode the CSI ID,
|
||||
and is preserved for any future proofing w.r.t changes in the encoding scheme, and to retain
|
||||
ability to parse backward compatible encodings.
|
||||
@ -43,7 +43,7 @@ The CSIIdentifier structure carries the following fields,
|
||||
corresponds to this CSI ID.
|
||||
*/
|
||||
type CSIIdentifier struct {
|
||||
PoolID int64 // TODO: Name appropriately when reused for CephFS
|
||||
LocationID int64
|
||||
EncodingVersion uint16
|
||||
ClusterID string
|
||||
ObjectUUID string
|
||||
@ -87,7 +87,7 @@ func (ci CSIIdentifier) ComposeCSIID() (string, error) {
|
||||
binary.BigEndian.PutUint16(buf16, uint16(len(ci.ClusterID)))
|
||||
clusterIDLength := hex.EncodeToString(buf16)
|
||||
|
||||
binary.BigEndian.PutUint64(buf64, uint64(ci.PoolID))
|
||||
binary.BigEndian.PutUint64(buf64, uint64(ci.LocationID))
|
||||
poolIDEncodedHex := hex.EncodeToString(buf64)
|
||||
|
||||
return strings.Join([]string{versionEncodedHex, clusterIDLength, ci.ClusterID,
|
||||
@ -136,7 +136,7 @@ func (ci *CSIIdentifier) DecomposeCSIID(composedCSIID string) (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ci.PoolID = int64(binary.BigEndian.Uint64(buf64))
|
||||
ci.LocationID = int64(binary.BigEndian.Uint64(buf64))
|
||||
// 16 for poolID encoding and 1 for '-' separator
|
||||
bytesToProcess -= 17
|
||||
nextFieldStartIdx = nextFieldStartIdx + 17
|
||||
|
@ -33,7 +33,7 @@ type testTuple struct {
|
||||
var testData = []testTuple{
|
||||
{
|
||||
vID: CSIIdentifier{
|
||||
PoolID: 0xffff,
|
||||
LocationID: 0xffff,
|
||||
EncodingVersion: 0xffff,
|
||||
ClusterID: "01616094-9d93-4178-bf45-c7eac19e8b15",
|
||||
ObjectUUID: "00000000-1111-2222-bbbb-cacacacacaca",
|
||||
|
@ -114,6 +114,9 @@ type CSIJournal struct {
|
||||
|
||||
// volume name prefix for naming on Ceph rbd or FS, suffix is a uuid generated per volume
|
||||
namingPrefix string
|
||||
|
||||
// namespace in which the RADOS objects are stored, default is no namespace
|
||||
namespace string
|
||||
}
|
||||
|
||||
// CSIVolumeJournal returns an instance of volume keys
|
||||
@ -125,6 +128,7 @@ func NewCSIVolumeJournal() *CSIJournal {
|
||||
csiNameKey: "csi.volname",
|
||||
namingPrefix: "csi-vol-",
|
||||
cephSnapSourceKey: "",
|
||||
namespace: "",
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,6 +141,7 @@ func NewCSISnapshotJournal() *CSIJournal {
|
||||
csiNameKey: "csi.snapname",
|
||||
namingPrefix: "csi-snap-",
|
||||
cephSnapSourceKey: "csi.source",
|
||||
namespace: "",
|
||||
}
|
||||
}
|
||||
|
||||
@ -150,6 +155,11 @@ func (cj *CSIJournal) SetCSIDirectorySuffix(suffix string) {
|
||||
cj.csiDirectory = cj.csiDirectory + "." + suffix
|
||||
}
|
||||
|
||||
// SetNamespace sets the namespace in which all RADOS objects would be created
|
||||
func (cj *CSIJournal) SetNamespace(ns string) {
|
||||
cj.namespace = ns
|
||||
}
|
||||
|
||||
/*
|
||||
CheckReservation checks if given request name contains a valid reservation
|
||||
- If there is a valid reservation, then the corresponding UUID for the volume/snapshot is returned
|
||||
@ -177,7 +187,7 @@ func (cj *CSIJournal) CheckReservation(monitors, id, key, pool, reqName, parentN
|
||||
}
|
||||
|
||||
// check if request name is already part of the directory omap
|
||||
objUUID, err := GetOMapValue(monitors, id, key, pool, cj.csiDirectory,
|
||||
objUUID, err := GetOMapValue(monitors, id, key, pool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for volume to be absent, any other error
|
||||
@ -237,7 +247,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName
|
||||
// delete volume UUID omap (first, inverse of create order)
|
||||
// TODO: Check cases where volName can be empty, and we need to just cleanup the reqName
|
||||
imageUUID := strings.TrimPrefix(volName, cj.namingPrefix)
|
||||
err := RemoveObject(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+imageUUID)
|
||||
err := RemoveObject(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrObjectNotFound); !ok {
|
||||
klog.Errorf("failed removing oMap %s (%s)", cj.cephUUIDDirectoryPrefix+imageUUID, err)
|
||||
@ -246,7 +256,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName
|
||||
}
|
||||
|
||||
// delete the request name key (last, inverse of create order)
|
||||
err = RemoveOMapKey(monitors, id, key, pool, cj.csiDirectory,
|
||||
err = RemoveOMapKey(monitors, id, key, pool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err)
|
||||
@ -259,7 +269,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName
|
||||
// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated <uuid>.
|
||||
// It ensures generated omap name does not already exist and if conflicts are detected, a set
|
||||
// number of retires with newer uuids are attempted before returning an error
|
||||
func reserveOMapName(monitors, id, key, pool, oMapNamePrefix string) (string, error) {
|
||||
func reserveOMapName(monitors, id, key, pool, namespace, oMapNamePrefix string) (string, error) {
|
||||
var iterUUID string
|
||||
|
||||
maxAttempts := 5
|
||||
@ -268,7 +278,7 @@ func reserveOMapName(monitors, id, key, pool, oMapNamePrefix string) (string, er
|
||||
// generate a uuid for the image name
|
||||
iterUUID = uuid.NewUUID().String()
|
||||
|
||||
err := CreateObject(monitors, id, key, pool, oMapNamePrefix+iterUUID)
|
||||
err := CreateObject(monitors, id, key, pool, namespace, oMapNamePrefix+iterUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrObjectExists); ok {
|
||||
attempt++
|
||||
@ -315,15 +325,15 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s
|
||||
// NOTE: If any service loss occurs post creation of the UUID directory, and before
|
||||
// setting the request name key (csiNameKey) to point back to the UUID directory, the
|
||||
// UUID directory key will be leaked
|
||||
volUUID, err := reserveOMapName(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix)
|
||||
volUUID, err := reserveOMapName(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Create request name (csiNameKey) key in csiDirectory and store the UUId based
|
||||
// volume name into it
|
||||
err = SetOMapKeyValue(monitors, id, key, pool, cj.csiDirectory, cj.csiNameKeyPrefix+reqName,
|
||||
volUUID)
|
||||
err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName, volUUID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -339,7 +349,7 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s
|
||||
}()
|
||||
|
||||
// Update UUID directory to store CSI request name
|
||||
err = SetOMapKeyValue(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.csiNameKey, reqName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -347,7 +357,7 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s
|
||||
|
||||
if snapSource {
|
||||
// Update UUID directory to store source volume UUID in case of snapshots
|
||||
err = SetOMapKeyValue(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.cephSnapSourceKey, parentName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -373,14 +383,14 @@ func (cj *CSIJournal) GetObjectUUIDData(monitors, id, key, pool, objectUUID stri
|
||||
}
|
||||
|
||||
// TODO: fetch all omap vals in one call, than make multiple listomapvals
|
||||
requestName, err := GetOMapValue(monitors, id, key, pool,
|
||||
requestName, err := GetOMapValue(monitors, id, key, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
if snapSource {
|
||||
sourceName, err = GetOMapValue(monitors, id, key, pool,
|
||||
sourceName, err = GetOMapValue(monitors, id, key, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
|
Reference in New Issue
Block a user