mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-09 16:00:22 +00:00
switch to cephfs, utils, and csicommon to new loging system
Signed-off-by: Daniel-Pivonka <dpivonka@redhat.com>
This commit is contained in:
parent
7d3a18c5b7
commit
01a78cace5
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
@ -33,11 +34,11 @@ type CephFilesystemDetails struct {
|
||||
MDSMap MDSMap `json:"mdsmap"`
|
||||
}
|
||||
|
||||
func getFscID(monitors string, cr *util.Credentials, fsName string) (int64, error) {
|
||||
func getFscID(ctx context.Context, monitors string, cr *util.Credentials, fsName string) (int64, error) {
|
||||
// ceph fs get myfs --format=json
|
||||
// {"mdsmap":{...},"id":2}
|
||||
var fsDetails CephFilesystemDetails
|
||||
err := execCommandJSON(&fsDetails,
|
||||
err := execCommandJSON(ctx, &fsDetails,
|
||||
"ceph",
|
||||
"-m", monitors,
|
||||
"--id", cr.ID,
|
||||
@ -61,11 +62,11 @@ type CephFilesystem struct {
|
||||
DataPoolIDs []int `json:"data_pool_ids"`
|
||||
}
|
||||
|
||||
func getMetadataPool(monitors string, cr *util.Credentials, fsName string) (string, error) {
|
||||
func getMetadataPool(ctx context.Context, monitors string, cr *util.Credentials, fsName string) (string, error) {
|
||||
// ./tbox ceph fs ls --format=json
|
||||
// [{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":4,...},...]
|
||||
var filesystems []CephFilesystem
|
||||
err := execCommandJSON(&filesystems,
|
||||
err := execCommandJSON(ctx, &filesystems,
|
||||
"ceph",
|
||||
"-m", monitors,
|
||||
"--id", cr.ID,
|
||||
@ -91,11 +92,11 @@ type CephFilesystemDump struct {
|
||||
Filesystems []CephFilesystemDetails `json:"filesystems"`
|
||||
}
|
||||
|
||||
func getFsName(monitors string, cr *util.Credentials, fscID int64) (string, error) {
|
||||
func getFsName(ctx context.Context, monitors string, cr *util.Credentials, fscID int64) (string, error) {
|
||||
// ./tbox ceph fs dump --format=json
|
||||
// JSON: {...,"filesystems":[{"mdsmap":{},"id":<n>},...],...}
|
||||
var fsDump CephFilesystemDump
|
||||
err := execCommandJSON(&fsDump,
|
||||
err := execCommandJSON(ctx, &fsDump,
|
||||
"ceph",
|
||||
"-m", monitors,
|
||||
"--id", cr.ID,
|
||||
|
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
)
|
||||
|
||||
@ -33,11 +35,11 @@ func getCephUserName(volID volumeID) string {
|
||||
return cephUserPrefix + string(volID)
|
||||
}
|
||||
|
||||
func deleteCephUserDeprecated(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error {
|
||||
func deleteCephUserDeprecated(ctx context.Context, volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error {
|
||||
adminID, userID := genUserIDs(adminCr, volID)
|
||||
|
||||
// TODO: Need to return success if userID is not found
|
||||
return execCommandErr("ceph",
|
||||
return execCommandErr(ctx, "ceph",
|
||||
"-m", volOptions.Monitors,
|
||||
"-n", adminID,
|
||||
"--keyfile="+adminCr.KeyFile,
|
||||
|
@ -46,21 +46,21 @@ var (
|
||||
)
|
||||
|
||||
// createBackingVolume creates the backing subvolume and on any error cleans up any created entities
|
||||
func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
|
||||
func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
|
||||
cr, err := util.NewAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
if err = createVolume(volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil {
|
||||
klog.Errorf("failed to create volume %s: %v", volOptions.RequestName, err)
|
||||
if err = createVolume(ctx, volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to create volume %s: %v"), volOptions.RequestName, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if errDefer := purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil {
|
||||
klog.Warningf("failed purging volume: %s (%s)", volOptions.RequestName, errDefer)
|
||||
if errDefer := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil {
|
||||
klog.Warningf(util.Log(ctx, "failed purging volume: %s (%s)"), volOptions.RequestName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -71,17 +71,17 @@ func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *
|
||||
// CreateVolume creates a reservation and the volume in backend, if it is not already present
|
||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||
if err := cs.validateCreateVolumeRequest(req); err != nil {
|
||||
klog.Errorf("CreateVolumeRequest validation failed: %v", err)
|
||||
klog.Errorf(util.Log(ctx, "CreateVolumeRequest validation failed: %v"), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Configuration
|
||||
secret := req.GetSecrets()
|
||||
requestName := req.GetName()
|
||||
volOptions, err := newVolumeOptions(requestName, req.GetCapacityRange().GetRequiredBytes(),
|
||||
volOptions, err := newVolumeOptions(ctx, requestName, req.GetCapacityRange().GetRequiredBytes(),
|
||||
req.GetParameters(), secret)
|
||||
if err != nil {
|
||||
klog.Errorf("validation and extraction of volume options failed: %v", err)
|
||||
klog.Errorf(util.Log(ctx, "validation and extraction of volume options failed: %v"), err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
@ -89,7 +89,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
idLk := volumeNameLocker.Lock(requestName)
|
||||
defer volumeNameLocker.Unlock(idLk, requestName)
|
||||
|
||||
vID, err := checkVolExists(volOptions, secret)
|
||||
vID, err := checkVolExists(ctx, volOptions, secret)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
@ -104,27 +104,27 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
}
|
||||
|
||||
// Reservation
|
||||
vID, err = reserveVol(volOptions, secret)
|
||||
vID, err = reserveVol(ctx, volOptions, secret)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := undoVolReservation(volOptions, *vID, secret)
|
||||
errDefer := undoVolReservation(ctx, volOptions, *vID, secret)
|
||||
if errDefer != nil {
|
||||
klog.Warningf("failed undoing reservation of volume: %s (%s)",
|
||||
klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"),
|
||||
requestName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a volume
|
||||
err = cs.createBackingVolume(volOptions, vID, secret)
|
||||
err = cs.createBackingVolume(ctx, volOptions, vID, secret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully created backing volume named %s for request name %s",
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully created backing volume named %s for request name %s"),
|
||||
vID.FsSubvolName, requestName)
|
||||
|
||||
return &csi.CreateVolumeResponse{
|
||||
@ -138,7 +138,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
|
||||
// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin,
|
||||
// that have state information stored in files or kubernetes config maps
|
||||
func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
func (cs *ControllerServer) deleteVolumeDeprecated(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
var (
|
||||
volID = volumeID(req.GetVolumeId())
|
||||
secrets = req.GetSecrets()
|
||||
@ -147,7 +147,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
|
||||
ce := &controllerCacheEntry{}
|
||||
if err := cs.MetadataStore.Get(string(volID), ce); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err)
|
||||
klog.Infof(util.Log(ctx, "cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)"), volID, err)
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
@ -157,14 +157,14 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
|
||||
if !ce.VolOptions.ProvisionVolume {
|
||||
// DeleteVolume() is forbidden for statically provisioned volumes!
|
||||
|
||||
klog.Warningf("volume %s is provisioned statically, aborting delete", volID)
|
||||
klog.Warningf(util.Log(ctx, "volume %s is provisioned statically, aborting delete"), volID)
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// mons may have changed since create volume,
|
||||
// retrieve the latest mons and override old mons
|
||||
if mon, secretsErr := util.GetMonValFromSecret(secrets); secretsErr == nil && len(mon) > 0 {
|
||||
klog.Infof("overriding monitors [%q] with [%q] for volume %s", ce.VolOptions.Monitors, mon, volID)
|
||||
klog.Infof(util.Log(ctx, "overriding monitors [%q] with [%q] for volume %s"), ce.VolOptions.Monitors, mon, volID)
|
||||
ce.VolOptions.Monitors = mon
|
||||
}
|
||||
|
||||
@ -172,7 +172,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
|
||||
|
||||
cr, err := util.NewAdminCredentials(secrets)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to retrieve admin credentials: %v", err)
|
||||
klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
@ -180,13 +180,13 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
|
||||
idLk := volumeIDLocker.Lock(string(volID))
|
||||
defer volumeIDLocker.Unlock(idLk, string(volID))
|
||||
|
||||
if err = purgeVolumeDeprecated(volID, cr, &ce.VolOptions); err != nil {
|
||||
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
||||
if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = deleteCephUserDeprecated(&ce.VolOptions, cr, volID); err != nil {
|
||||
klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err)
|
||||
if err = deleteCephUserDeprecated(ctx, &ce.VolOptions, cr, volID); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to delete ceph user for volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
@ -194,7 +194,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully deleted volume %s", volID)
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully deleted volume %s"), volID)
|
||||
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
@ -202,7 +202,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
|
||||
// DeleteVolume deletes the volume in backend and its reservation
|
||||
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
if err := cs.validateDeleteVolumeRequest(); err != nil {
|
||||
klog.Errorf("DeleteVolumeRequest validation failed: %v", err)
|
||||
klog.Errorf(util.Log(ctx, "DeleteVolumeRequest validation failed: %v"), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -210,7 +210,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
secrets := req.GetSecrets()
|
||||
|
||||
// Find the volume using the provided VolumeID
|
||||
volOptions, vID, err := newVolumeOptionsFromVolID(string(volID), nil, secrets)
|
||||
volOptions, vID, err := newVolumeOptionsFromVolID(ctx, string(volID), nil, secrets)
|
||||
if err != nil {
|
||||
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
||||
// or partially complete (subvolume and imageOMap are garbage collected already), hence
|
||||
@ -221,7 +221,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
|
||||
// ErrInvalidVolID may mean this is an 1.0.0 version volume
|
||||
if _, ok := err.(ErrInvalidVolID); ok && cs.MetadataStore != nil {
|
||||
return cs.deleteVolumeDeprecated(req)
|
||||
return cs.deleteVolumeDeprecated(ctx, req)
|
||||
}
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -230,7 +230,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
// Deleting a volume requires admin credentials
|
||||
cr, err := util.NewAdminCredentials(secrets)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to retrieve admin credentials: %v", err)
|
||||
klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
@ -240,16 +240,16 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
idLk := volumeNameLocker.Lock(volOptions.RequestName)
|
||||
defer volumeNameLocker.Unlock(idLk, volOptions.RequestName)
|
||||
|
||||
if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
||||
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
||||
if err = purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err := undoVolReservation(volOptions, *vID, secrets); err != nil {
|
||||
if err := undoVolReservation(ctx, volOptions, *vID, secrets); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully deleted volume %s", volID)
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully deleted volume %s"), volID)
|
||||
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"k8s.io/klog"
|
||||
@ -43,7 +45,7 @@ because, the order of omap creation and deletion are inverse of each other, and
|
||||
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
|
||||
hence safe to garbage collect.
|
||||
*/
|
||||
func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
|
||||
func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
|
||||
var (
|
||||
vi util.CSIIdentifier
|
||||
vid volumeIdentifier
|
||||
@ -55,7 +57,7 @@ func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volum
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
imageUUID, err := volJournal.CheckReservation(volOptions.Monitors, cr,
|
||||
imageUUID, err := volJournal.CheckReservation(ctx, volOptions.Monitors, cr,
|
||||
volOptions.MetadataPool, volOptions.RequestName, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -79,21 +81,21 @@ func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volum
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Found existing volume (%s) with subvolume name (%s) for request (%s)",
|
||||
klog.V(4).Infof(util.Log(ctx, "Found existing volume (%s) with subvolume name (%s) for request (%s)"),
|
||||
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
|
||||
|
||||
return &vid, nil
|
||||
}
|
||||
|
||||
// undoVolReservation is a helper routine to undo a name reservation for a CSI VolumeName
|
||||
func undoVolReservation(volOptions *volumeOptions, vid volumeIdentifier, secret map[string]string) error {
|
||||
func undoVolReservation(ctx context.Context, volOptions *volumeOptions, vid volumeIdentifier, secret map[string]string) error {
|
||||
cr, err := util.NewAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
err = volJournal.UndoReservation(volOptions.Monitors, cr, volOptions.MetadataPool,
|
||||
err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool,
|
||||
vid.FsSubvolName, volOptions.RequestName)
|
||||
|
||||
return err
|
||||
@ -101,7 +103,7 @@ func undoVolReservation(volOptions *volumeOptions, vid volumeIdentifier, secret
|
||||
|
||||
// reserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and,
|
||||
// to generate the volume identifier for the reserved UUID
|
||||
func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
|
||||
func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
|
||||
var (
|
||||
vi util.CSIIdentifier
|
||||
vid volumeIdentifier
|
||||
@ -113,7 +115,7 @@ func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIde
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
imageUUID, err := volJournal.ReserveName(volOptions.Monitors, cr,
|
||||
imageUUID, err := volJournal.ReserveName(ctx, volOptions.Monitors, cr,
|
||||
volOptions.MetadataPool, volOptions.RequestName, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -132,7 +134,7 @@ func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIde
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Generated Volume ID (%s) and subvolume name (%s) for request name (%s)",
|
||||
klog.V(4).Infof(util.Log(ctx, "Generated Volume ID (%s) and subvolume name (%s) for request name (%s)"),
|
||||
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
|
||||
|
||||
return &vid, nil
|
||||
|
@ -1,6 +1,23 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"os"
|
||||
"sync"
|
||||
@ -51,7 +68,7 @@ func remountCachedVolumes() error {
|
||||
me := &volumeMountCacheEntry{}
|
||||
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
|
||||
volID := me.VolumeID
|
||||
if volOpts, vid, err := newVolumeOptionsFromVolID(me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil {
|
||||
if volOpts, vid, err := newVolumeOptionsFromVolID(context.TODO(), me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil {
|
||||
if err, ok := err.(util.ErrKeyNotFound); ok {
|
||||
klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err)
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
@ -101,7 +118,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
volOptions.RootPath, err = getVolumeRootPathCeph(volOptions, cr, volumeID(vid.FsSubvolName))
|
||||
volOptions.RootPath, err = getVolumeRootPathCeph(context.TODO(), volOptions, cr, volumeID(vid.FsSubvolName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -131,7 +148,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
|
||||
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
if err := m.mount(me.StagingPath, cr, volOptions); err != nil {
|
||||
if err := m.mount(context.TODO(), me.StagingPath, cr, volOptions); err != nil {
|
||||
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
@ -140,7 +157,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
|
||||
mountOptions := []string{"bind"}
|
||||
for targetPath, readOnly := range me.TargetPaths {
|
||||
if err := cleanupMountPoint(targetPath); err == nil {
|
||||
if err := bindMount(me.StagingPath, targetPath, readOnly, mountOptions); err != nil {
|
||||
if err := bindMount(context.TODO(), me.StagingPath, targetPath, readOnly, mountOptions); err != nil {
|
||||
klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v",
|
||||
volID, me.StagingPath, targetPath, readOnly, err)
|
||||
} else {
|
||||
@ -156,7 +173,7 @@ func cleanupMountPoint(mountPoint string) error {
|
||||
if _, err := os.Stat(mountPoint); err != nil {
|
||||
if isCorruptedMnt(err) {
|
||||
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
|
||||
err := execCommandErr("umount", mountPoint)
|
||||
err := execCommandErr(context.TODO(), "umount", mountPoint)
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: failed to umount %s %v", mountPoint, err)
|
||||
// ignore error return err
|
||||
@ -205,7 +222,7 @@ func (mc *volumeMountCacheMap) isEnable() bool {
|
||||
return mc.nodeCacheStore.BasePath != ""
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter string, secrets map[string]string) error {
|
||||
func (mc *volumeMountCacheMap) nodeStageVolume(ctx context.Context, volID, stagingTargetPath, mounter string, secrets map[string]string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
@ -216,11 +233,11 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter
|
||||
me, ok := volumeMountCache.volumes[volID]
|
||||
if ok {
|
||||
if me.StagingPath == stagingTargetPath {
|
||||
klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID)
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: node unexpected restage volume for volume %s"), volID)
|
||||
return nil
|
||||
}
|
||||
lastTargetPaths = me.TargetPaths
|
||||
klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID)
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: node stage volume ignore last cache entry for volume %s"), volID)
|
||||
}
|
||||
|
||||
me = volumeMountCacheEntry{DriverVersion: util.DriverVersion}
|
||||
@ -246,7 +263,7 @@ func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error {
|
||||
return mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID))
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodePublishVolume(volID, targetPath string, readOnly bool) error {
|
||||
func (mc *volumeMountCacheMap) nodePublishVolume(ctx context.Context, volID, targetPath string, readOnly bool) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
@ -258,10 +275,10 @@ func (mc *volumeMountCacheMap) nodePublishVolume(volID, targetPath string, readO
|
||||
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
|
||||
}
|
||||
volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly
|
||||
return mc.updateNodeCache(volID)
|
||||
return mc.updateNodeCache(ctx, volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID, targetPath string) error {
|
||||
func (mc *volumeMountCacheMap) nodeUnPublishVolume(ctx context.Context, volID, targetPath string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
@ -273,13 +290,13 @@ func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID, targetPath string) err
|
||||
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
|
||||
}
|
||||
delete(volumeMountCache.volumes[volID].TargetPaths, targetPath)
|
||||
return mc.updateNodeCache(volID)
|
||||
return mc.updateNodeCache(ctx, volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) updateNodeCache(volID string) error {
|
||||
func (mc *volumeMountCacheMap) updateNodeCache(ctx context.Context, volID string) error {
|
||||
me := volumeMountCache.volumes[volID]
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata not found, delete mount cache failed for volume %s", volID)
|
||||
klog.Infof(util.Log(ctx, "mount-cache: metadata not found, delete mount cache failed for volume %s"), volID)
|
||||
}
|
||||
return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me)
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
volID := volumeID(req.GetVolumeId())
|
||||
|
||||
volOptions, _, err := newVolumeOptionsFromVolID(string(volID), req.GetVolumeContext(), req.GetSecrets())
|
||||
volOptions, _, err := newVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets())
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrInvalidVolID); !ok {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -110,50 +110,50 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
isMnt, err := util.IsMountPoint(stagingTargetPath)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("stat failed: %v", err)
|
||||
klog.Errorf(util.Log(ctx, "stat failed: %v"), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if isMnt {
|
||||
klog.Infof("cephfs: volume %s is already mounted to %s, skipping", volID, stagingTargetPath)
|
||||
klog.Infof(util.Log(ctx, "cephfs: volume %s is already mounted to %s, skipping"), volID, stagingTargetPath)
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// It's not, mount now
|
||||
if err = ns.mount(volOptions, req); err != nil {
|
||||
if err = ns.mount(ctx, volOptions, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath)
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully mounted volume %s to %s"), volID, stagingTargetPath)
|
||||
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error {
|
||||
func (*NodeServer) mount(ctx context.Context, volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error {
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
volID := volumeID(req.GetVolumeId())
|
||||
|
||||
cr, err := getCredentialsForVolume(volOptions, req)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err)
|
||||
klog.Errorf(util.Log(ctx, "failed to get ceph credentials for volume %s: %v"), volID, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
m, err := newMounter(volOptions)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to create mounter for volume %s: %v", volID, err)
|
||||
klog.Errorf(util.Log(ctx, "failed to create mounter for volume %s: %v"), volID, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.V(4).Infof("cephfs: mounting volume %s with %s", volID, m.name())
|
||||
klog.V(4).Infof(util.Log(ctx, "cephfs: mounting volume %s with %s"), volID, m.name())
|
||||
|
||||
if err = m.mount(stagingTargetPath, cr, volOptions); err != nil {
|
||||
klog.Errorf("failed to mount volume %s: %v", volID, err)
|
||||
if err = m.mount(ctx, stagingTargetPath, cr, volOptions); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to mount volume %s: %v"), volID, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil {
|
||||
klog.Warningf("mount-cache: failed to stage volume %s %s: %v", volID, stagingTargetPath, err)
|
||||
if err := volumeMountCache.nodeStageVolume(ctx, req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to stage volume %s %s: %v"), volID, stagingTargetPath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -173,7 +173,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
volID := req.GetVolumeId()
|
||||
|
||||
if err := util.CreateMountPoint(targetPath); err != nil {
|
||||
klog.Errorf("failed to create mount point at %s: %v", targetPath, err)
|
||||
klog.Errorf(util.Log(ctx, "failed to create mount point at %s: %v"), targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
@ -204,31 +204,31 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
isMnt, err := util.IsMountPoint(targetPath)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("stat failed: %v", err)
|
||||
klog.Errorf(util.Log(ctx, "stat failed: %v"), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if isMnt {
|
||||
klog.Infof("cephfs: volume %s is already bind-mounted to %s", volID, targetPath)
|
||||
klog.Infof(util.Log(ctx, "cephfs: volume %s is already bind-mounted to %s"), volID, targetPath)
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// It's not, mount now
|
||||
|
||||
if err = bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly(), mountOptions); err != nil {
|
||||
klog.Errorf("failed to bind-mount volume %s: %v", volID, err)
|
||||
if err = bindMount(ctx, req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly(), mountOptions); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to bind-mount volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = volumeMountCache.nodePublishVolume(volID, targetPath, req.GetReadonly()); err != nil {
|
||||
klog.Warningf("mount-cache: failed to publish volume %s %s: %v", volID, targetPath, err)
|
||||
if err = volumeMountCache.nodePublishVolume(ctx, volID, targetPath, req.GetReadonly()); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to publish volume %s %s: %v"), volID, targetPath, err)
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath)
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully bind-mounted volume %s to %s"), volID, targetPath)
|
||||
|
||||
err = os.Chmod(targetPath, 0777)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to change targetpath permission for volume %s: %v", volID, err)
|
||||
klog.Errorf(util.Log(ctx, "failed to change targetpath permission for volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
@ -245,12 +245,12 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
targetPath := req.GetTargetPath()
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if err = volumeMountCache.nodeUnPublishVolume(volID, targetPath); err != nil {
|
||||
klog.Warningf("mount-cache: failed to unpublish volume %s %s: %v", volID, targetPath, err)
|
||||
if err = volumeMountCache.nodeUnPublishVolume(ctx, volID, targetPath); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to unpublish volume %s %s: %v"), volID, targetPath, err)
|
||||
}
|
||||
|
||||
// Unmount the bind-mount
|
||||
if err = unmountVolume(targetPath); err != nil {
|
||||
if err = unmountVolume(ctx, targetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
@ -258,7 +258,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully unbinded volume %s from %s", req.GetVolumeId(), targetPath)
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully unbinded volume %s from %s"), req.GetVolumeId(), targetPath)
|
||||
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
@ -274,15 +274,15 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if err = volumeMountCache.nodeUnStageVolume(volID); err != nil {
|
||||
klog.Warningf("mount-cache: failed to unstage volume %s %s: %v", volID, stagingTargetPath, err)
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to unstage volume %s %s: %v"), volID, stagingTargetPath, err)
|
||||
}
|
||||
|
||||
// Unmount the volume
|
||||
if err = unmountVolume(stagingTargetPath); err != nil {
|
||||
if err = unmountVolume(ctx, stagingTargetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully unmounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully unmounted volume %s from %s"), req.GetVolumeId(), stagingTargetPath)
|
||||
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package cephfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@ -33,7 +34,7 @@ import (
|
||||
|
||||
type volumeID string
|
||||
|
||||
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
|
||||
func execCommand(ctx context.Context, program string, args ...string) (stdout, stderr []byte, err error) {
|
||||
var (
|
||||
cmd = exec.Command(program, args...) // nolint: gosec
|
||||
sanitizedArgs = util.StripSecretInArgs(args)
|
||||
@ -44,7 +45,7 @@ func execCommand(program string, args ...string) (stdout, stderr []byte, err err
|
||||
cmd.Stdout = &stdoutBuf
|
||||
cmd.Stderr = &stderrBuf
|
||||
|
||||
klog.V(4).Infof("cephfs: EXEC %s %s", program, sanitizedArgs)
|
||||
klog.V(4).Infof(util.Log(ctx, "cephfs: EXEC %s %s"), program, sanitizedArgs)
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
if cmd.Process == nil {
|
||||
@ -58,14 +59,14 @@ func execCommand(program string, args ...string) (stdout, stderr []byte, err err
|
||||
return stdoutBuf.Bytes(), stderrBuf.Bytes(), nil
|
||||
}
|
||||
|
||||
func execCommandErr(program string, args ...string) error {
|
||||
_, _, err := execCommand(program, args...)
|
||||
func execCommandErr(ctx context.Context, program string, args ...string) error {
|
||||
_, _, err := execCommand(ctx, program, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
//nolint: unparam
|
||||
func execCommandJSON(v interface{}, program string, args ...string) error {
|
||||
stdout, _, err := execCommand(program, args...)
|
||||
func execCommandJSON(ctx context.Context, v interface{}, program string, args ...string) error {
|
||||
stdout, _, err := execCommand(ctx, program, args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
@ -51,7 +52,7 @@ func getCephRootPathLocalDeprecated(volID volumeID) string {
|
||||
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID))
|
||||
}
|
||||
|
||||
func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) {
|
||||
func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) {
|
||||
stdout, _, err := util.ExecCommand(
|
||||
"ceph",
|
||||
"fs",
|
||||
@ -67,16 +68,17 @@ func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volI
|
||||
"--keyfile="+cr.KeyFile)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get the rootpath for the vol %s(%s)", string(volID), err)
|
||||
klog.Errorf(util.Log(ctx, "failed to get the rootpath for the vol %s(%s)"), string(volID), err)
|
||||
return "", err
|
||||
}
|
||||
return strings.TrimSuffix(string(stdout), "\n"), nil
|
||||
}
|
||||
|
||||
func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
|
||||
func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
|
||||
//TODO: When we support multiple fs, need to hande subvolume group create for all fs's
|
||||
if !cephfsInit {
|
||||
err := execCommandErr(
|
||||
ctx,
|
||||
"ceph",
|
||||
"fs",
|
||||
"subvolumegroup",
|
||||
@ -88,10 +90,10 @@ func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeI
|
||||
"-n", cephEntityClientPrefix+cr.ID,
|
||||
"--keyfile="+cr.KeyFile)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to create subvolume group csi, for the vol %s(%s)", string(volID), err)
|
||||
klog.Errorf(util.Log(ctx, "failed to create subvolume group csi, for the vol %s(%s)"), string(volID), err)
|
||||
return err
|
||||
}
|
||||
klog.V(4).Infof("cephfs: created subvolume group csi")
|
||||
klog.V(4).Infof(util.Log(ctx, "cephfs: created subvolume group csi"))
|
||||
cephfsInit = true
|
||||
}
|
||||
|
||||
@ -116,17 +118,18 @@ func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeI
|
||||
}
|
||||
|
||||
err := execCommandErr(
|
||||
ctx,
|
||||
"ceph",
|
||||
args[:]...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to create subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
|
||||
klog.Errorf(util.Log(ctx, "failed to create subvolume %s(%s) in fs %s"), string(volID), err, volOptions.FsName)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error {
|
||||
func mountCephRoot(ctx context.Context, volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error {
|
||||
cephRoot := getCephRootPathLocalDeprecated(volID)
|
||||
|
||||
// Root path is not set for dynamically provisioned volumes
|
||||
@ -142,30 +145,30 @@ func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Cred
|
||||
return fmt.Errorf("failed to create mounter: %v", err)
|
||||
}
|
||||
|
||||
if err = m.mount(cephRoot, adminCr, volOptions); err != nil {
|
||||
if err = m.mount(ctx, cephRoot, adminCr, volOptions); err != nil {
|
||||
return fmt.Errorf("error mounting ceph root: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func unmountCephRoot(volID volumeID) {
|
||||
func unmountCephRoot(ctx context.Context, volID volumeID) {
|
||||
cephRoot := getCephRootPathLocalDeprecated(volID)
|
||||
|
||||
if err := unmountVolume(cephRoot); err != nil {
|
||||
klog.Errorf("failed to unmount %s with error %s", cephRoot, err)
|
||||
if err := unmountVolume(ctx, cephRoot); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to unmount %s with error %s"), cephRoot, err)
|
||||
} else {
|
||||
if err := os.Remove(cephRoot); err != nil {
|
||||
klog.Errorf("failed to remove %s with error %s", cephRoot, err)
|
||||
klog.Errorf(util.Log(ctx, "failed to remove %s with error %s"), cephRoot, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func purgeVolumeDeprecated(volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
|
||||
func purgeVolumeDeprecated(ctx context.Context, volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := mountCephRoot(ctx, volID, volOptions, adminCr); err != nil {
|
||||
return err
|
||||
}
|
||||
defer unmountCephRoot(volID)
|
||||
defer unmountCephRoot(ctx, volID)
|
||||
|
||||
var (
|
||||
volRoot = getCephRootVolumePathLocalDeprecated(volID)
|
||||
@ -178,7 +181,7 @@ func purgeVolumeDeprecated(volID volumeID, adminCr *util.Credentials, volOptions
|
||||
}
|
||||
} else {
|
||||
if !pathExists(volRootDeleting) {
|
||||
klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID)
|
||||
klog.V(4).Infof(util.Log(ctx, "cephfs: volume %s not found, assuming it to be already deleted"), volID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -190,8 +193,9 @@ func purgeVolumeDeprecated(volID volumeID, adminCr *util.Credentials, volOptions
|
||||
return nil
|
||||
}
|
||||
|
||||
func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
func purgeVolume(ctx context.Context, volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
err := execCommandErr(
|
||||
ctx,
|
||||
"ceph",
|
||||
"fs",
|
||||
"subvolume",
|
||||
@ -206,7 +210,7 @@ func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions
|
||||
"-n", cephEntityClientPrefix+cr.ID,
|
||||
"--keyfile="+cr.KeyFile)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to purge subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
|
||||
klog.Errorf(util.Log(ctx, "failed to purge subvolume %s(%s) in fs %s"), string(volID), err, volOptions.FsName)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
@ -70,7 +71,7 @@ func loadAvailableMounters() error {
|
||||
}
|
||||
|
||||
type volumeMounter interface {
|
||||
mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error
|
||||
mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error
|
||||
name() string
|
||||
}
|
||||
|
||||
@ -114,7 +115,7 @@ func newMounter(volOptions *volumeOptions) (volumeMounter, error) {
|
||||
|
||||
type fuseMounter struct{}
|
||||
|
||||
func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
func mountFuse(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
args := []string{
|
||||
mountPoint,
|
||||
"-m", volOptions.Monitors,
|
||||
@ -128,7 +129,7 @@ func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOption
|
||||
args = append(args, "--client_mds_namespace="+volOptions.FsName)
|
||||
}
|
||||
|
||||
_, stderr, err := execCommand("ceph-fuse", args[:]...)
|
||||
_, stderr, err := execCommand(ctx, "ceph-fuse", args[:]...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -154,20 +155,20 @@ func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOption
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fuseMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
func (m *fuseMounter) mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := util.CreateMountPoint(mountPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mountFuse(mountPoint, cr, volOptions)
|
||||
return mountFuse(ctx, mountPoint, cr, volOptions)
|
||||
}
|
||||
|
||||
func (m *fuseMounter) name() string { return "Ceph FUSE driver" }
|
||||
|
||||
type kernelMounter struct{}
|
||||
|
||||
func mountKernel(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := execCommandErr("modprobe", "ceph"); err != nil {
|
||||
func mountKernel(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := execCommandErr(ctx, "modprobe", "ceph"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -182,28 +183,28 @@ func mountKernel(mountPoint string, cr *util.Credentials, volOptions *volumeOpti
|
||||
}
|
||||
args = append(args, "-o", optionsStr)
|
||||
|
||||
return execCommandErr("mount", args[:]...)
|
||||
return execCommandErr(ctx, "mount", args[:]...)
|
||||
}
|
||||
|
||||
func (m *kernelMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
func (m *kernelMounter) mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := util.CreateMountPoint(mountPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mountKernel(mountPoint, cr, volOptions)
|
||||
return mountKernel(ctx, mountPoint, cr, volOptions)
|
||||
}
|
||||
|
||||
func (m *kernelMounter) name() string { return "Ceph kernel client" }
|
||||
|
||||
func bindMount(from, to string, readOnly bool, mntOptions []string) error {
|
||||
func bindMount(ctx context.Context, from, to string, readOnly bool, mntOptions []string) error {
|
||||
mntOptionSli := strings.Join(mntOptions, ",")
|
||||
if err := execCommandErr("mount", "-o", mntOptionSli, from, to); err != nil {
|
||||
if err := execCommandErr(ctx, "mount", "-o", mntOptionSli, from, to); err != nil {
|
||||
return fmt.Errorf("failed to bind-mount %s to %s: %v", from, to, err)
|
||||
}
|
||||
|
||||
if readOnly {
|
||||
mntOptionSli += ",remount"
|
||||
if err := execCommandErr("mount", "-o", mntOptionSli, to); err != nil {
|
||||
if err := execCommandErr(ctx, "mount", "-o", mntOptionSli, to); err != nil {
|
||||
return fmt.Errorf("failed read-only remount of %s: %v", to, err)
|
||||
}
|
||||
}
|
||||
@ -211,8 +212,8 @@ func bindMount(from, to string, readOnly bool, mntOptions []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func unmountVolume(mountPoint string) error {
|
||||
if err := execCommandErr("umount", mountPoint); err != nil {
|
||||
func unmountVolume(ctx context.Context, mountPoint string) error {
|
||||
if err := execCommandErr(ctx, "umount", mountPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -226,10 +227,10 @@ func unmountVolume(mountPoint string) error {
|
||||
if ok {
|
||||
p, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to find process %d: %v", pid, err)
|
||||
klog.Warningf(util.Log(ctx, "failed to find process %d: %v"), pid, err)
|
||||
} else {
|
||||
if _, err = p.Wait(); err != nil {
|
||||
klog.Warningf("%d is not a child process: %v", pid, err)
|
||||
klog.Warningf(util.Log(ctx, "%d is not a child process: %v"), pid, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
@ -123,7 +124,7 @@ func getMonsAndClusterID(options map[string]string) (string, string, error) {
|
||||
|
||||
// newVolumeOptions generates a new instance of volumeOptions from the provided
|
||||
// CSI request parameters
|
||||
func newVolumeOptions(requestName string, size int64, volOptions, secret map[string]string) (*volumeOptions, error) {
|
||||
func newVolumeOptions(ctx context.Context, requestName string, size int64, volOptions, secret map[string]string) (*volumeOptions, error) {
|
||||
var (
|
||||
opts volumeOptions
|
||||
err error
|
||||
@ -155,12 +156,12 @@ func newVolumeOptions(requestName string, size int64, volOptions, secret map[str
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
opts.FscID, err = getFscID(opts.Monitors, cr, opts.FsName)
|
||||
opts.FscID, err = getFscID(ctx, opts.Monitors, cr, opts.FsName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts.MetadataPool, err = getMetadataPool(opts.Monitors, cr, opts.FsName)
|
||||
opts.MetadataPool, err = getMetadataPool(ctx, opts.Monitors, cr, opts.FsName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -172,7 +173,7 @@ func newVolumeOptions(requestName string, size int64, volOptions, secret map[str
|
||||
|
||||
// newVolumeOptionsFromVolID generates a new instance of volumeOptions and volumeIdentifier
|
||||
// from the provided CSI VolumeID
|
||||
func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) {
|
||||
func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) {
|
||||
var (
|
||||
vi util.CSIIdentifier
|
||||
volOptions volumeOptions
|
||||
@ -201,17 +202,17 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string)
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
volOptions.FsName, err = getFsName(volOptions.Monitors, cr, volOptions.FscID)
|
||||
volOptions.FsName, err = getFsName(ctx, volOptions.Monitors, cr, volOptions.FscID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
volOptions.MetadataPool, err = getMetadataPool(volOptions.Monitors, cr, volOptions.FsName)
|
||||
volOptions.MetadataPool, err = getMetadataPool(ctx, volOptions.Monitors, cr, volOptions.FsName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
volOptions.RequestName, _, err = volJournal.GetObjectUUIDData(volOptions.Monitors, cr,
|
||||
volOptions.RequestName, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr,
|
||||
volOptions.MetadataPool, vi.ObjectUUID, false)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -227,7 +228,7 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string)
|
||||
}
|
||||
}
|
||||
|
||||
volOptions.RootPath, err = getVolumeRootPathCeph(&volOptions, cr, volumeID(vid.FsSubvolName))
|
||||
volOptions.RootPath, err = getVolumeRootPathCeph(ctx, &volOptions, cr, volumeID(vid.FsSubvolName))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ package csicommon
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
@ -58,7 +60,7 @@ func (cs *DefaultControllerServer) GetCapacity(ctx context.Context, req *csi.Get
|
||||
// ControllerGetCapabilities implements the default GRPC callout.
|
||||
// Default supports all capabilities
|
||||
func (cs *DefaultControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
|
||||
klog.V(5).Infof("Using default ControllerGetCapabilities")
|
||||
klog.V(5).Infof(util.Log(ctx, "Using default ControllerGetCapabilities"))
|
||||
|
||||
return &csi.ControllerGetCapabilitiesResponse{
|
||||
Capabilities: cs.Driver.cap,
|
||||
|
@ -19,6 +19,8 @@ package csicommon
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
@ -32,7 +34,7 @@ type DefaultIdentityServer struct {
|
||||
|
||||
// GetPluginInfo returns plugin information
|
||||
func (ids *DefaultIdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
|
||||
klog.V(5).Infof("Using default GetPluginInfo")
|
||||
klog.V(5).Infof(util.Log(ctx, "Using default GetPluginInfo"))
|
||||
|
||||
if ids.Driver.name == "" {
|
||||
return nil, status.Error(codes.Unavailable, "Driver name not configured")
|
||||
@ -55,7 +57,7 @@ func (ids *DefaultIdentityServer) Probe(ctx context.Context, req *csi.ProbeReque
|
||||
|
||||
// GetPluginCapabilities returns plugin capabilities
|
||||
func (ids *DefaultIdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
|
||||
klog.V(5).Infof("Using default capabilities")
|
||||
klog.V(5).Infof(util.Log(ctx, "Using default capabilities"))
|
||||
return &csi.GetPluginCapabilitiesResponse{
|
||||
Capabilities: []*csi.PluginCapability{
|
||||
{
|
||||
|
@ -55,7 +55,7 @@ func (ns *DefaultNodeServer) NodeExpandVolume(ctx context.Context, req *csi.Node
|
||||
|
||||
// NodeGetInfo returns node ID
|
||||
func (ns *DefaultNodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
|
||||
klog.V(5).Infof("Using default NodeGetInfo")
|
||||
klog.V(5).Infof(util.Log(ctx, "Using default NodeGetInfo"))
|
||||
|
||||
return &csi.NodeGetInfoResponse{
|
||||
NodeId: ns.Driver.nodeID,
|
||||
@ -64,7 +64,7 @@ func (ns *DefaultNodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetIn
|
||||
|
||||
// NodeGetCapabilities returns RPC unknow capability
|
||||
func (ns *DefaultNodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
|
||||
klog.V(5).Infof("Using default NodeGetCapabilities")
|
||||
klog.V(5).Infof(util.Log(ctx, "Using default NodeGetCapabilities"))
|
||||
|
||||
return &csi.NodeGetCapabilitiesResponse{
|
||||
Capabilities: []*csi.NodeServiceCapability{
|
||||
@ -129,31 +129,31 @@ func (ns *DefaultNodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.No
|
||||
|
||||
available, ok := (*(volMetrics.Available)).AsInt64()
|
||||
if !ok {
|
||||
klog.Errorf("failed to fetch available bytes")
|
||||
klog.Errorf(util.Log(ctx, "failed to fetch available bytes"))
|
||||
}
|
||||
capacity, ok := (*(volMetrics.Capacity)).AsInt64()
|
||||
if !ok {
|
||||
klog.Errorf("failed to fetch capacity bytes")
|
||||
klog.Errorf(util.Log(ctx, "failed to fetch capacity bytes"))
|
||||
return nil, status.Error(codes.Unknown, "failed to fetch capacity bytes")
|
||||
}
|
||||
used, ok := (*(volMetrics.Used)).AsInt64()
|
||||
if !ok {
|
||||
klog.Errorf("failed to fetch used bytes")
|
||||
klog.Errorf(util.Log(ctx, "failed to fetch used bytes"))
|
||||
}
|
||||
inodes, ok := (*(volMetrics.Inodes)).AsInt64()
|
||||
if !ok {
|
||||
klog.Errorf("failed to fetch available inodes")
|
||||
klog.Errorf(util.Log(ctx, "failed to fetch available inodes"))
|
||||
return nil, status.Error(codes.Unknown, "failed to fetch available inodes")
|
||||
|
||||
}
|
||||
inodesFree, ok := (*(volMetrics.InodesFree)).AsInt64()
|
||||
if !ok {
|
||||
klog.Errorf("failed to fetch free inodes")
|
||||
klog.Errorf(util.Log(ctx, "failed to fetch free inodes"))
|
||||
}
|
||||
|
||||
inodesUsed, ok := (*(volMetrics.InodesUsed)).AsInt64()
|
||||
if !ok {
|
||||
klog.Errorf("failed to fetch used inodes")
|
||||
klog.Errorf(util.Log(ctx, "failed to fetch used inodes"))
|
||||
}
|
||||
return &csi.NodeGetVolumeStatsResponse{
|
||||
Usage: []*csi.VolumeUsage{
|
||||
|
@ -150,7 +150,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := undoVolReservation(rbdVol, cr)
|
||||
errDefer := undoVolReservation(ctx, rbdVol, cr)
|
||||
if errDefer != nil {
|
||||
klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer)
|
||||
}
|
||||
@ -326,7 +326,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
idLk := volumeNameLocker.Lock(rbdVol.RequestName)
|
||||
defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName)
|
||||
|
||||
if err := undoVolReservation(rbdVol, cr); err != nil {
|
||||
if err := undoVolReservation(ctx, rbdVol, cr); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
@ -345,7 +345,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err := undoVolReservation(rbdVol, cr); err != nil {
|
||||
if err := undoVolReservation(ctx, rbdVol, cr); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)"),
|
||||
rbdVol.RequestName, rbdVol.RbdImageName, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -444,7 +444,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := undoSnapReservation(rbdSnap, cr)
|
||||
errDefer := undoSnapReservation(ctx, rbdSnap, cr)
|
||||
if errDefer != nil {
|
||||
klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s %v"), req.GetName(), errDefer)
|
||||
}
|
||||
@ -568,7 +568,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
||||
idLk := snapshotNameLocker.Lock(rbdSnap.RequestName)
|
||||
defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName)
|
||||
|
||||
if err = undoSnapReservation(rbdSnap, cr); err != nil {
|
||||
if err = undoSnapReservation(ctx, rbdSnap, cr); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return &csi.DeleteSnapshotResponse{}, nil
|
||||
|
@ -114,7 +114,7 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent
|
||||
return false, err
|
||||
}
|
||||
|
||||
snapUUID, err := snapJournal.CheckReservation(rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
snapUUID, err := snapJournal.CheckReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.RequestName, rbdSnap.RbdImageName)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -128,7 +128,7 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent
|
||||
err = updateSnapWithImageInfo(ctx, rbdSnap, cr)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrSnapNotFound); ok {
|
||||
err = snapJournal.UndoReservation(rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
err = snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.RbdSnapName, rbdSnap.RequestName)
|
||||
return false, err
|
||||
}
|
||||
@ -136,7 +136,7 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent
|
||||
}
|
||||
|
||||
// found a snapshot already available, process and return its information
|
||||
rbdSnap.SnapID, err = util.GenerateVolID(rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.ClusterID, snapUUID, volIDVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -162,7 +162,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
|
||||
return false, err
|
||||
}
|
||||
|
||||
imageUUID, err := volJournal.CheckReservation(rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
imageUUID, err := volJournal.CheckReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.RequestName, "")
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -179,7 +179,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
|
||||
err = updateVolWithImageInfo(ctx, rbdVol, cr)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrImageNotFound); ok {
|
||||
err = volJournal.UndoReservation(rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
err = volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.RbdImageName, rbdVol.RequestName)
|
||||
return false, err
|
||||
}
|
||||
@ -195,7 +195,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
|
||||
// TODO: We should also ensure image features and format is the same
|
||||
|
||||
// found a volume already available, process and return it!
|
||||
rbdVol.VolID, err = util.GenerateVolID(rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.ClusterID, imageUUID, volIDVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -210,13 +210,13 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
|
||||
// reserveSnap is a helper routine to request a rbdSnapshot name reservation and generate the
|
||||
// volume ID for the generated name
|
||||
func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
|
||||
snapUUID, err := snapJournal.ReserveName(rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
snapUUID, err := snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.RequestName, rbdSnap.RbdImageName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.SnapID, err = util.GenerateVolID(rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.ClusterID, snapUUID, volIDVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -233,13 +233,13 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials
|
||||
// reserveVol is a helper routine to request a rbdVolume name reservation and generate the
|
||||
// volume ID for the generated name
|
||||
func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
|
||||
imageUUID, err := volJournal.ReserveName(rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
imageUUID, err := volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.RequestName, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdVol.VolID, err = util.GenerateVolID(rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.ClusterID, imageUUID, volIDVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -254,16 +254,16 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) er
|
||||
}
|
||||
|
||||
// undoSnapReservation is a helper routine to undo a name reservation for rbdSnapshot
|
||||
func undoSnapReservation(rbdSnap *rbdSnapshot, cr *util.Credentials) error {
|
||||
err := snapJournal.UndoReservation(rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
|
||||
err := snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
|
||||
rbdSnap.RbdSnapName, rbdSnap.RequestName)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// undoVolReservation is a helper routine to undo a name reservation for rbdVolume
|
||||
func undoVolReservation(rbdVol *rbdVolume, cr *util.Credentials) error {
|
||||
err := volJournal.UndoReservation(rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
|
||||
err := volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
|
||||
rbdVol.RbdImageName, rbdVol.RequestName)
|
||||
|
||||
return err
|
||||
|
@ -298,12 +298,12 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, cr, vi.LocationID)
|
||||
rbdSnap.Pool, err = util.GetPoolName(ctx, rbdSnap.Monitors, cr, vi.LocationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.RequestName, rbdSnap.RbdImageName, err = snapJournal.GetObjectUUIDData(rbdSnap.Monitors,
|
||||
rbdSnap.RequestName, rbdSnap.RbdImageName, err = snapJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors,
|
||||
cr, rbdSnap.Pool, vi.ObjectUUID, true)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -342,12 +342,12 @@ func genVolFromVolID(ctx context.Context, rbdVol *rbdVolume, volumeID string, cr
|
||||
return err
|
||||
}
|
||||
|
||||
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID)
|
||||
rbdVol.Pool, err = util.GetPoolName(ctx, rbdVol.Monitors, cr, vi.LocationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdVol.RequestName, _, err = volJournal.GetObjectUUIDData(rbdVol.Monitors, cr,
|
||||
rbdVol.RequestName, _, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr,
|
||||
rbdVol.Pool, vi.ObjectUUID, false)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -594,7 +594,7 @@ func deleteSnapshot(ctx context.Context, pOpts *rbdSnapshot, cr *util.Credential
|
||||
return errors.Wrapf(err, "failed to delete snapshot, command output: %s", string(output))
|
||||
}
|
||||
|
||||
if err := undoSnapReservation(pOpts, cr); err != nil {
|
||||
if err := undoSnapReservation(ctx, pOpts, cr); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) on image (%s) (%s)"),
|
||||
pOpts.RequestName, pOpts.RbdSnapName, pOpts.RbdImageName, err)
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@ -55,7 +56,7 @@ type cephStoragePoolSummary struct {
|
||||
}
|
||||
|
||||
// GetPools fetches a list of pools from a cluster
|
||||
func getPools(monitors string, cr *Credentials) ([]cephStoragePoolSummary, error) {
|
||||
func getPools(ctx context.Context, monitors string, cr *Credentials) ([]cephStoragePoolSummary, error) {
|
||||
// ceph <options> -f json osd lspools
|
||||
// JSON out: [{"poolnum":<int64>,"poolname":<string>}]
|
||||
|
||||
@ -68,14 +69,14 @@ func getPools(monitors string, cr *Credentials) ([]cephStoragePoolSummary, error
|
||||
"-f", "json",
|
||||
"osd", "lspools")
|
||||
if err != nil {
|
||||
klog.Errorf("failed getting pool list from cluster (%s)", err)
|
||||
klog.Errorf(Log(ctx, "failed getting pool list from cluster (%s)"), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var pools []cephStoragePoolSummary
|
||||
err = json.Unmarshal(stdout, &pools)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to parse JSON output of pool list from cluster (%s)", err)
|
||||
klog.Errorf(Log(ctx, "failed to parse JSON output of pool list from cluster (%s)"), err)
|
||||
return nil, fmt.Errorf("unmarshal of pool list failed: %+v. raw buffer response: %s", err, string(stdout))
|
||||
}
|
||||
|
||||
@ -84,8 +85,8 @@ func getPools(monitors string, cr *Credentials) ([]cephStoragePoolSummary, error
|
||||
|
||||
// GetPoolID searches a list of pools in a cluster and returns the ID of the pool that matches
|
||||
// the passed in poolName parameter
|
||||
func GetPoolID(monitors string, cr *Credentials, poolName string) (int64, error) {
|
||||
pools, err := getPools(monitors, cr)
|
||||
func GetPoolID(ctx context.Context, monitors string, cr *Credentials, poolName string) (int64, error) {
|
||||
pools, err := getPools(ctx, monitors, cr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -101,8 +102,8 @@ func GetPoolID(monitors string, cr *Credentials, poolName string) (int64, error)
|
||||
|
||||
// GetPoolName lists all pools in a ceph cluster, and matches the pool whose pool ID is equal to
|
||||
// the requested poolID parameter
|
||||
func GetPoolName(monitors string, cr *Credentials, poolID int64) (string, error) {
|
||||
pools, err := getPools(monitors, cr)
|
||||
func GetPoolName(ctx context.Context, monitors string, cr *Credentials, poolID int64) (string, error) {
|
||||
pools, err := getPools(ctx, monitors, cr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -117,7 +118,7 @@ func GetPoolName(monitors string, cr *Credentials, poolID int64) (string, error)
|
||||
}
|
||||
|
||||
// SetOMapKeyValue sets the given key and value into the provided Ceph omap name
|
||||
func SetOMapKeyValue(monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey, keyValue string) error {
|
||||
func SetOMapKeyValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey, keyValue string) error {
|
||||
// Command: "rados <options> setomapval oMapName oMapKey keyValue"
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
@ -134,8 +135,8 @@ func SetOMapKeyValue(monitors string, cr *Credentials, poolName, namespace, oMap
|
||||
|
||||
_, _, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed adding key (%s with value %s), to omap (%s) in "+
|
||||
"pool (%s): (%v)", oMapKey, keyValue, oMapName, poolName, err)
|
||||
klog.Errorf(Log(ctx, "failed adding key (%s with value %s), to omap (%s) in "+
|
||||
"pool (%s): (%v)"), oMapKey, keyValue, oMapName, poolName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -143,12 +144,12 @@ func SetOMapKeyValue(monitors string, cr *Credentials, poolName, namespace, oMap
|
||||
}
|
||||
|
||||
// GetOMapValue gets the value for the given key from the named omap
|
||||
func GetOMapValue(monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) (string, error) {
|
||||
func GetOMapValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) (string, error) {
|
||||
// Command: "rados <options> getomapval oMapName oMapKey <outfile>"
|
||||
// No such key: replicapool/csi.volumes.directory.default/csi.volname
|
||||
tmpFile, err := ioutil.TempFile("", "omap-get-")
|
||||
if err != nil {
|
||||
klog.Errorf("failed creating a temporary file for key contents")
|
||||
klog.Errorf(Log(ctx, "failed creating a temporary file for key contents"))
|
||||
return "", err
|
||||
}
|
||||
defer tmpFile.Close()
|
||||
@ -182,7 +183,7 @@ func GetOMapValue(monitors string, cr *Credentials, poolName, namespace, oMapNam
|
||||
}
|
||||
|
||||
// log other errors for troubleshooting assistance
|
||||
klog.Errorf("failed getting omap value for key (%s) from omap (%s) in pool (%s): (%v)",
|
||||
klog.Errorf(Log(ctx, "failed getting omap value for key (%s) from omap (%s) in pool (%s): (%v)"),
|
||||
oMapKey, oMapName, poolName, err)
|
||||
|
||||
return "", fmt.Errorf("error (%v) occurred, command output streams is (%s)",
|
||||
@ -194,7 +195,7 @@ func GetOMapValue(monitors string, cr *Credentials, poolName, namespace, oMapNam
|
||||
}
|
||||
|
||||
// RemoveOMapKey removes the omap key from the given omap name
|
||||
func RemoveOMapKey(monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) error {
|
||||
func RemoveOMapKey(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) error {
|
||||
// Command: "rados <options> rmomapkey oMapName oMapKey"
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
@ -212,8 +213,8 @@ func RemoveOMapKey(monitors string, cr *Credentials, poolName, namespace, oMapNa
|
||||
_, _, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
// NOTE: Missing omap key removal does not return an error
|
||||
klog.Errorf("failed removing key (%s), from omap (%s) in "+
|
||||
"pool (%s): (%v)", oMapKey, oMapName, poolName, err)
|
||||
klog.Errorf(Log(ctx, "failed removing key (%s), from omap (%s) in "+
|
||||
"pool (%s): (%v)"), oMapKey, oMapName, poolName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -222,7 +223,7 @@ func RemoveOMapKey(monitors string, cr *Credentials, poolName, namespace, oMapNa
|
||||
|
||||
// CreateObject creates the object name passed in and returns ErrObjectExists if the provided object
|
||||
// is already present in rados
|
||||
func CreateObject(monitors string, cr *Credentials, poolName, namespace, objectName string) error {
|
||||
func CreateObject(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, objectName string) error {
|
||||
// Command: "rados <options> create objectName"
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
@ -239,7 +240,7 @@ func CreateObject(monitors string, cr *Credentials, poolName, namespace, objectN
|
||||
|
||||
_, stderr, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed creating omap (%s) in pool (%s): (%v)", objectName, poolName, err)
|
||||
klog.Errorf(Log(ctx, "failed creating omap (%s) in pool (%s): (%v)"), objectName, poolName, err)
|
||||
if strings.Contains(string(stderr), "error creating "+poolName+"/"+objectName+
|
||||
": (17) File exists") {
|
||||
return ErrObjectExists{objectName, err}
|
||||
@ -252,7 +253,7 @@ func CreateObject(monitors string, cr *Credentials, poolName, namespace, objectN
|
||||
|
||||
// RemoveObject removes the entire omap name passed in and returns ErrObjectNotFound is provided omap
|
||||
// is not found in rados
|
||||
func RemoveObject(monitors string, cr *Credentials, poolName, namespace, oMapName string) error {
|
||||
func RemoveObject(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName string) error {
|
||||
// Command: "rados <options> rm oMapName"
|
||||
args := []string{
|
||||
"-m", monitors,
|
||||
@ -269,7 +270,7 @@ func RemoveObject(monitors string, cr *Credentials, poolName, namespace, oMapNam
|
||||
|
||||
_, stderr, err := ExecCommand("rados", args[:]...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed removing omap (%s) in pool (%s): (%v)", oMapName, poolName, err)
|
||||
klog.Errorf(Log(ctx, "failed removing omap (%s) in pool (%s): (%v)"), oMapName, poolName, err)
|
||||
if strings.Contains(string(stderr), "error removing "+poolName+">"+oMapName+
|
||||
": (2) No such file or directory") {
|
||||
return ErrObjectNotFound{oMapName, err}
|
||||
|
@ -1,12 +1,9 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -28,6 +25,10 @@ var Key = contextKey("ID")
|
||||
|
||||
// Log helps in context based logging
|
||||
func Log(ctx context.Context, format string) string {
|
||||
a := fmt.Sprintf("ID: %v ", ctx.Value(Key))
|
||||
id := ctx.Value(Key)
|
||||
if id == nil {
|
||||
return format
|
||||
}
|
||||
a := fmt.Sprintf("ID: %v ", id)
|
||||
return a + format
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
@ -131,8 +132,8 @@ func ValidateDriverName(driverName string) error {
|
||||
|
||||
// GenerateVolID generates a volume ID based on passed in parameters and version, to be returned
|
||||
// to the CO system
|
||||
func GenerateVolID(monitors string, cr *Credentials, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) {
|
||||
poolID, err := GetPoolID(monitors, cr, pool)
|
||||
func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) {
|
||||
poolID, err := GetPoolID(ctx, monitors, cr, pool)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
@ -175,7 +176,7 @@ Return values:
|
||||
there was no reservation found
|
||||
- error: non-nil in case of any errors
|
||||
*/
|
||||
func (cj *CSIJournal) CheckReservation(monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) {
|
||||
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) {
|
||||
var snapSource bool
|
||||
|
||||
if parentName != "" {
|
||||
@ -187,7 +188,7 @@ func (cj *CSIJournal) CheckReservation(monitors string, cr *Credentials, pool, r
|
||||
}
|
||||
|
||||
// check if request name is already part of the directory omap
|
||||
objUUID, err := GetOMapValue(monitors, cr, pool, cj.namespace, cj.csiDirectory,
|
||||
objUUID, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for volume to be absent, any other error
|
||||
@ -198,13 +199,13 @@ func (cj *CSIJournal) CheckReservation(monitors string, cr *Credentials, pool, r
|
||||
return "", err
|
||||
}
|
||||
|
||||
savedReqName, savedReqParentName, err := cj.GetObjectUUIDData(monitors, cr, pool,
|
||||
savedReqName, savedReqParentName, err := cj.GetObjectUUIDData(ctx, monitors, cr, pool,
|
||||
objUUID, snapSource)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for image to be absent, any other error
|
||||
// is not conclusive, and we should not proceed
|
||||
if _, ok := err.(ErrKeyNotFound); ok {
|
||||
err = cj.UndoReservation(monitors, cr, pool, cj.namingPrefix+objUUID, reqName)
|
||||
err = cj.UndoReservation(ctx, monitors, cr, pool, cj.namingPrefix+objUUID, reqName)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
@ -243,23 +244,23 @@ prior to cleaning up the reservation
|
||||
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
|
||||
held, to prevent parallel operations from modifying the state of the omaps for this request name.
|
||||
*/
|
||||
func (cj *CSIJournal) UndoReservation(monitors string, cr *Credentials, pool, volName, reqName string) error {
|
||||
func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials, pool, volName, reqName string) error {
|
||||
// delete volume UUID omap (first, inverse of create order)
|
||||
// TODO: Check cases where volName can be empty, and we need to just cleanup the reqName
|
||||
imageUUID := strings.TrimPrefix(volName, cj.namingPrefix)
|
||||
err := RemoveObject(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
|
||||
err := RemoveObject(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrObjectNotFound); !ok {
|
||||
klog.Errorf("failed removing oMap %s (%s)", cj.cephUUIDDirectoryPrefix+imageUUID, err)
|
||||
klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// delete the request name key (last, inverse of create order)
|
||||
err = RemoveOMapKey(monitors, cr, pool, cj.namespace, cj.csiDirectory,
|
||||
err = RemoveOMapKey(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err)
|
||||
klog.Errorf(Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -269,7 +270,7 @@ func (cj *CSIJournal) UndoReservation(monitors string, cr *Credentials, pool, vo
|
||||
// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated <uuid>.
|
||||
// It ensures generated omap name does not already exist and if conflicts are detected, a set
|
||||
// number of retires with newer uuids are attempted before returning an error
|
||||
func reserveOMapName(monitors string, cr *Credentials, pool, namespace, oMapNamePrefix string) (string, error) {
|
||||
func reserveOMapName(ctx context.Context, monitors string, cr *Credentials, pool, namespace, oMapNamePrefix string) (string, error) {
|
||||
var iterUUID string
|
||||
|
||||
maxAttempts := 5
|
||||
@ -278,12 +279,12 @@ func reserveOMapName(monitors string, cr *Credentials, pool, namespace, oMapName
|
||||
// generate a uuid for the image name
|
||||
iterUUID = uuid.NewUUID().String()
|
||||
|
||||
err := CreateObject(monitors, cr, pool, namespace, oMapNamePrefix+iterUUID)
|
||||
err := CreateObject(ctx, monitors, cr, pool, namespace, oMapNamePrefix+iterUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrObjectExists); ok {
|
||||
attempt++
|
||||
// try again with a different uuid, for maxAttempts tries
|
||||
klog.V(4).Infof("uuid (%s) conflict detected, retrying (attempt %d of %d)",
|
||||
klog.V(4).Infof(Log(ctx, "uuid (%s) conflict detected, retrying (attempt %d of %d)"),
|
||||
iterUUID, attempt, maxAttempts)
|
||||
continue
|
||||
}
|
||||
@ -310,7 +311,7 @@ Return values:
|
||||
- string: Contains the UUID that was reserved for the passed in reqName
|
||||
- error: non-nil in case of any errors
|
||||
*/
|
||||
func (cj *CSIJournal) ReserveName(monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) {
|
||||
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) {
|
||||
var snapSource bool
|
||||
|
||||
if parentName != "" {
|
||||
@ -325,31 +326,31 @@ func (cj *CSIJournal) ReserveName(monitors string, cr *Credentials, pool, reqNam
|
||||
// NOTE: If any service loss occurs post creation of the UUID directory, and before
|
||||
// setting the request name key (csiNameKey) to point back to the UUID directory, the
|
||||
// UUID directory key will be leaked
|
||||
volUUID, err := reserveOMapName(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix)
|
||||
volUUID, err := reserveOMapName(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Create request name (csiNameKey) key in csiDirectory and store the UUId based
|
||||
// volume name into it
|
||||
err = SetOMapKeyValue(monitors, cr, pool, cj.namespace, cj.csiDirectory,
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName, volUUID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
klog.Warningf("reservation failed for volume: %s", reqName)
|
||||
errDefer := cj.UndoReservation(monitors, cr, pool, cj.namingPrefix+volUUID,
|
||||
klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName)
|
||||
errDefer := cj.UndoReservation(ctx, monitors, cr, pool, cj.namingPrefix+volUUID,
|
||||
reqName)
|
||||
if errDefer != nil {
|
||||
klog.Warningf("failed undoing reservation of volume: %s (%v)", reqName, errDefer)
|
||||
klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Update UUID directory to store CSI request name
|
||||
err = SetOMapKeyValue(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.csiNameKey, reqName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -357,7 +358,7 @@ func (cj *CSIJournal) ReserveName(monitors string, cr *Credentials, pool, reqNam
|
||||
|
||||
if snapSource {
|
||||
// Update UUID directory to store source volume UUID in case of snapshots
|
||||
err = SetOMapKeyValue(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.cephSnapSourceKey, parentName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -374,7 +375,7 @@ Return values:
|
||||
- string: Contains the parent image name for the passed in UUID, if it is a snapshot
|
||||
- error: non-nil in case of any errors
|
||||
*/
|
||||
func (cj *CSIJournal) GetObjectUUIDData(monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, error) {
|
||||
func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, error) {
|
||||
var sourceName string
|
||||
|
||||
if snapSource && cj.cephSnapSourceKey == "" {
|
||||
@ -383,14 +384,14 @@ func (cj *CSIJournal) GetObjectUUIDData(monitors string, cr *Credentials, pool,
|
||||
}
|
||||
|
||||
// TODO: fetch all omap vals in one call, than make multiple listomapvals
|
||||
requestName, err := GetOMapValue(monitors, cr, pool, cj.namespace,
|
||||
requestName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
if snapSource {
|
||||
sourceName, err = GetOMapValue(monitors, cr, pool, cj.namespace,
|
||||
sourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
|
Loading…
Reference in New Issue
Block a user