From c8271fe64cb1f7036edc1b55662907afc4fcfd87 Mon Sep 17 00:00:00 2001 From: John Mulligan Date: Thu, 23 Apr 2020 14:22:55 -0400 Subject: [PATCH] journal: move voljournal.go to a new package This new journal package isolates journal logic from the rest of util and helps draw bright lines between what is a generic utility function and what is csi journal logic. Done partly as preparation for making use of go-ceph in journal. No functional changes are made except to update references to allow the code to compile. Signed-off-by: John Mulligan --- internal/cephfs/driver.go | 5 +- internal/{util => journal}/voljournal.go | 82 ++++++++++++------------ internal/rbd/driver.go | 9 +-- internal/rbd/nodeserver.go | 3 +- 4 files changed, 52 insertions(+), 47 deletions(-) rename internal/{util => journal}/voljournal.go (87%) diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index bea2650fa..9c3176f38 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -20,6 +20,7 @@ import ( "k8s.io/klog" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/container-storage-interface/spec/lib/go/csi" @@ -55,7 +56,7 @@ var ( // volJournal is used to maintain RADOS based journals for CO generated // VolumeName to backing CephFS subvolumes - volJournal *util.CSIJournal + volJournal *journal.CSIJournal ) // NewDriver returns new ceph driver @@ -109,7 +110,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { CSIInstanceID = conf.InstanceID } // Get an instance of the volume journal - volJournal = util.NewCSIVolumeJournal() + volJournal = journal.NewCSIVolumeJournal() // Update keys with CSI instance suffix volJournal.SetCSIDirectorySuffix(CSIInstanceID) diff --git a/internal/util/voljournal.go b/internal/journal/voljournal.go similarity index 87% rename from internal/util/voljournal.go rename to internal/journal/voljournal.go index 8c0f4101d..5df20200e 100644 --- a/internal/util/voljournal.go +++ b/internal/journal/voljournal.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package journal import ( "context" @@ -23,6 +23,8 @@ import ( "fmt" "strings" + "github.com/ceph/ceph-csi/internal/util" + "github.com/pborman/uuid" "github.com/pkg/errors" "k8s.io/klog" @@ -219,13 +221,13 @@ Return values: there was no reservation found - error: non-nil in case of any errors */ -func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, +func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *util.Credentials, journalPool, reqName, namePrefix, parentName, kmsConfig string) (*ImageData, error) { var ( snapSource bool objUUID string savedImagePool string - savedImagePoolID int64 = InvalidPoolID + savedImagePoolID int64 = util.InvalidPoolID ) if parentName != "" { @@ -237,13 +239,13 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr } // check if request name is already part of the directory omap - objUUIDAndPool, err := GetOMapValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory, + objUUIDAndPool, err := util.GetOMapValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { // error should specifically be not found, for volume to be absent, any other error // is not conclusive, and we should not proceed switch err.(type) { - case ErrKeyNotFound, ErrPoolNotFound: + case util.ErrKeyNotFound, util.ErrPoolNotFound: return nil, nil } return nil, err @@ -265,9 +267,9 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr } savedImagePoolID = int64(binary.BigEndian.Uint64(buf64)) - savedImagePool, err = GetPoolName(ctx, monitors, cr, savedImagePoolID) + savedImagePool, err = util.GetPoolName(ctx, monitors, cr, savedImagePoolID) if err != nil { - if _, ok := err.(ErrPoolNotFound); ok { + if _, ok := err.(util.ErrPoolNotFound); ok { err = cj.UndoReservation(ctx, monitors, cr, journalPool, "", "", reqName) } return nil, err @@ -279,7 +281,7 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr if err != nil { // error should specifically be not found, for image to be absent, any other error // is not conclusive, and we should not proceed - if _, ok := err.(ErrKeyNotFound); ok { + if _, ok := err.(util.ErrKeyNotFound); ok { err = cj.UndoReservation(ctx, monitors, cr, journalPool, savedImagePool, cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName) } @@ -315,7 +317,7 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr err = fmt.Errorf("snapname points to different volume, request name (%s)"+ " source name (%s) saved source name (%s)", reqName, parentName, savedImageAttributes.SourceName) - return nil, ErrSnapNameConflict{reqName, err} + return nil, util.NewErrSnapNameConflict(reqName, err) } } @@ -344,7 +346,7 @@ Input arguments: - volJournalPool: Pool name that holds the image/subvolume and the per-image journal (may be different if image is created in a topology constrained pool) */ -func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials, +func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *util.Credentials, csiJournalPool, volJournalPool, volName, reqName string) error { // delete volume UUID omap (first, inverse of create order) @@ -358,20 +360,20 @@ func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr * return fmt.Errorf("failed parsing UUID in %s", volName) } - err := RemoveObject(ctx, monitors, cr, volJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID) + err := util.RemoveObject(ctx, monitors, cr, volJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID) if err != nil { - if _, ok := err.(ErrObjectNotFound); !ok { - klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err) + if _, ok := err.(util.ErrObjectNotFound); !ok { + klog.Errorf(util.Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err) return err } } } // delete the request name key (last, inverse of create order) - err := RemoveOMapKey(ctx, monitors, cr, csiJournalPool, cj.namespace, cj.csiDirectory, + err := util.RemoveOMapKey(ctx, monitors, cr, csiJournalPool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { - klog.Errorf(Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err) + klog.Errorf(util.Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err) return err } @@ -381,7 +383,7 @@ func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr * // reserveOMapName creates an omap with passed in oMapNamePrefix and a generated . // It ensures generated omap name does not already exist and if conflicts are detected, a set // number of retires with newer uuids are attempted before returning an error -func reserveOMapName(ctx context.Context, monitors string, cr *Credentials, pool, namespace, oMapNamePrefix string) (string, error) { +func reserveOMapName(ctx context.Context, monitors string, cr *util.Credentials, pool, namespace, oMapNamePrefix string) (string, error) { var iterUUID string maxAttempts := 5 @@ -390,12 +392,12 @@ func reserveOMapName(ctx context.Context, monitors string, cr *Credentials, pool // generate a uuid for the image name iterUUID = uuid.NewUUID().String() - err := CreateObject(ctx, monitors, cr, pool, namespace, oMapNamePrefix+iterUUID) + err := util.CreateObject(ctx, monitors, cr, pool, namespace, oMapNamePrefix+iterUUID) if err != nil { - if _, ok := err.(ErrObjectExists); ok { + if _, ok := err.(util.ErrObjectExists); ok { attempt++ // try again with a different uuid, for maxAttempts tries - klog.V(4).Infof(Log(ctx, "uuid (%s) conflict detected, retrying (attempt %d of %d)"), + klog.V(4).Infof(util.Log(ctx, "uuid (%s) conflict detected, retrying (attempt %d of %d)"), iterUUID, attempt, maxAttempts) continue } @@ -433,7 +435,7 @@ Return values: - string: Contains the image name that was reserved for the passed in reqName - error: non-nil in case of any errors */ -func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, +func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *util.Credentials, journalPool string, journalPoolID int64, imagePool string, imagePoolID int64, reqName, namePrefix, parentName, kmsConf string) (string, string, error) { @@ -464,7 +466,7 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred // Create request name (csiNameKey) key in csiDirectory and store the UUID based // volume name and optionally the image pool location into it - if journalPool != imagePool && imagePoolID != InvalidPoolID { + if journalPool != imagePool && imagePoolID != util.InvalidPoolID { buf64 := make([]byte, 8) binary.BigEndian.PutUint64(buf64, uint64(imagePoolID)) poolIDEncodedHex := hex.EncodeToString(buf64) @@ -473,17 +475,17 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred nameKeyVal = volUUID } - err = SetOMapKeyValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory, + err = util.SetOMapKeyValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName, nameKeyVal) if err != nil { return "", "", err } defer func() { if err != nil { - klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName) + klog.Warningf(util.Log(ctx, "reservation failed for volume: %s"), reqName) errDefer := cj.UndoReservation(ctx, monitors, cr, imagePool, journalPool, imageName, reqName) if errDefer != nil { - klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer) + klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer) } } }() @@ -492,14 +494,14 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred // and also CSI journal pool, when only the VolumeID is passed in (e.g DeleteVolume/DeleteSnapshot, // VolID during CreateSnapshot). // Update UUID directory to store CSI request name - err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.csiNameKey, reqName) if err != nil { return "", "", err } // Update UUID directory to store image name - err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.csiImageKey, imageName) if err != nil { return "", "", err @@ -507,20 +509,20 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred // Update UUID directory to store encryption values if kmsConf != "" { - err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.encryptKMSKey, kmsConf) if err != nil { return "", "", err } } - if journalPool != imagePool && journalPoolID != InvalidPoolID { + if journalPool != imagePool && journalPoolID != util.InvalidPoolID { buf64 := make([]byte, 8) binary.BigEndian.PutUint64(buf64, uint64(journalPoolID)) journalPoolIDStr := hex.EncodeToString(buf64) // Update UUID directory to store CSI journal pool name (prefer ID instead of name to be pool rename proof) - err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.csiJournalPool, journalPoolIDStr) if err != nil { return "", "", err @@ -529,7 +531,7 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred if snapSource { // Update UUID directory to store source volume UUID in case of snapshots - err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.cephSnapSourceKey, parentName) if err != nil { return "", "", err @@ -549,7 +551,7 @@ type ImageAttributes struct { } // GetImageAttributes fetches all keys and their values, from a UUID directory, returning ImageAttributes structure -func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (*ImageAttributes, error) { +func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, cr *util.Credentials, pool, objectUUID string, snapSource bool) (*ImageAttributes, error) { var ( err error imageAttributes *ImageAttributes = &ImageAttributes{} @@ -561,7 +563,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c } // TODO: fetch all omap vals in one call, than make multiple listomapvals - imageAttributes.RequestName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.RequestName, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey) if err != nil { return nil, err @@ -569,7 +571,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c // image key was added at some point, so not all volumes will have this key set // when ceph-csi was upgraded - imageAttributes.ImageName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.ImageName, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey) if err != nil { // if the key was not found, assume the default key + UUID @@ -577,7 +579,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c switch err.(type) { default: return nil, err - case ErrKeyNotFound, ErrPoolNotFound: + case util.ErrKeyNotFound, util.ErrPoolNotFound: } if snapSource { @@ -587,7 +589,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c } } - imageAttributes.KmsID, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.KmsID, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey) if err != nil { // ErrKeyNotFound means no encryption KMS was used @@ -595,17 +597,17 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c default: return nil, fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s", pool, cj.cephUUIDDirectoryPrefix+objectUUID, err) - case ErrKeyNotFound, ErrPoolNotFound: + case util.ErrKeyNotFound, util.ErrPoolNotFound: } } - journalPoolIDStr, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + journalPoolIDStr, err := util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool) if err != nil { - if _, ok := err.(ErrKeyNotFound); !ok { + if _, ok := err.(util.ErrKeyNotFound); !ok { return nil, err } - imageAttributes.JournalPoolID = InvalidPoolID + imageAttributes.JournalPoolID = util.InvalidPoolID } else { var buf64 []byte buf64, err = hex.DecodeString(journalPoolIDStr) @@ -616,7 +618,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c } if snapSource { - imageAttributes.SourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.SourceName, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey) if err != nil { return nil, err diff --git a/internal/rbd/driver.go b/internal/rbd/driver.go index 3109eb95d..f203007a4 100644 --- a/internal/rbd/driver.go +++ b/internal/rbd/driver.go @@ -18,6 +18,7 @@ package rbd import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/container-storage-interface/spec/lib/go/csi" @@ -50,8 +51,8 @@ var ( // volJournal and snapJournal are used to maintain RADOS based journals for CO generated // VolumeName to backing RBD images - volJournal *util.CSIJournal - snapJournal *util.CSIJournal + volJournal *journal.CSIJournal + snapJournal *journal.CSIJournal ) // NewDriver returns new rbd driver @@ -103,8 +104,8 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { } // Get an instance of the volume and snapshot journal keys - volJournal = util.NewCSIVolumeJournal() - snapJournal = util.NewCSISnapshotJournal() + volJournal = journal.NewCSIVolumeJournal() + snapJournal = journal.NewCSISnapshotJournal() // Update keys with CSI instance suffix volJournal.SetCSIDirectorySuffix(CSIInstanceID) diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index d018ea76c..384994de1 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -24,6 +24,7 @@ import ( "strings" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/container-storage-interface/spec/lib/go/csi" @@ -145,7 +146,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } default: var vi util.CSIIdentifier - var imageAttributes *util.ImageAttributes + var imageAttributes *journal.ImageAttributes err = vi.DecomposeCSIID(volID) if err != nil { err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID)