mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-17 20:00:23 +00:00
journal: move voljournal.go to a new package
This new journal package isolates journal logic from the rest of util and helps draw bright lines between what is a generic utility function and what is csi journal logic. Done partly as preparation for making use of go-ceph in journal. No functional changes are made except to update references to allow the code to compile. Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
parent
22d1476bba
commit
c8271fe64c
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/klog"
|
||||
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
"github.com/ceph/ceph-csi/internal/journal"
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
@ -55,7 +56,7 @@ var (
|
||||
|
||||
// volJournal is used to maintain RADOS based journals for CO generated
|
||||
// VolumeName to backing CephFS subvolumes
|
||||
volJournal *util.CSIJournal
|
||||
volJournal *journal.CSIJournal
|
||||
)
|
||||
|
||||
// NewDriver returns new ceph driver
|
||||
@ -109,7 +110,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
CSIInstanceID = conf.InstanceID
|
||||
}
|
||||
// Get an instance of the volume journal
|
||||
volJournal = util.NewCSIVolumeJournal()
|
||||
volJournal = journal.NewCSIVolumeJournal()
|
||||
|
||||
// Update keys with CSI instance suffix
|
||||
volJournal.SetCSIDirectorySuffix(CSIInstanceID)
|
||||
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
package journal
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -23,6 +23,8 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
|
||||
"github.com/pborman/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/klog"
|
||||
@ -219,13 +221,13 @@ Return values:
|
||||
there was no reservation found
|
||||
- error: non-nil in case of any errors
|
||||
*/
|
||||
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials,
|
||||
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *util.Credentials,
|
||||
journalPool, reqName, namePrefix, parentName, kmsConfig string) (*ImageData, error) {
|
||||
var (
|
||||
snapSource bool
|
||||
objUUID string
|
||||
savedImagePool string
|
||||
savedImagePoolID int64 = InvalidPoolID
|
||||
savedImagePoolID int64 = util.InvalidPoolID
|
||||
)
|
||||
|
||||
if parentName != "" {
|
||||
@ -237,13 +239,13 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr
|
||||
}
|
||||
|
||||
// check if request name is already part of the directory omap
|
||||
objUUIDAndPool, err := GetOMapValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
|
||||
objUUIDAndPool, err := util.GetOMapValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for volume to be absent, any other error
|
||||
// is not conclusive, and we should not proceed
|
||||
switch err.(type) {
|
||||
case ErrKeyNotFound, ErrPoolNotFound:
|
||||
case util.ErrKeyNotFound, util.ErrPoolNotFound:
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
@ -265,9 +267,9 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr
|
||||
}
|
||||
savedImagePoolID = int64(binary.BigEndian.Uint64(buf64))
|
||||
|
||||
savedImagePool, err = GetPoolName(ctx, monitors, cr, savedImagePoolID)
|
||||
savedImagePool, err = util.GetPoolName(ctx, monitors, cr, savedImagePoolID)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrPoolNotFound); ok {
|
||||
if _, ok := err.(util.ErrPoolNotFound); ok {
|
||||
err = cj.UndoReservation(ctx, monitors, cr, journalPool, "", "", reqName)
|
||||
}
|
||||
return nil, err
|
||||
@ -279,7 +281,7 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr
|
||||
if err != nil {
|
||||
// error should specifically be not found, for image to be absent, any other error
|
||||
// is not conclusive, and we should not proceed
|
||||
if _, ok := err.(ErrKeyNotFound); ok {
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
err = cj.UndoReservation(ctx, monitors, cr, journalPool, savedImagePool,
|
||||
cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName)
|
||||
}
|
||||
@ -315,7 +317,7 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr
|
||||
err = fmt.Errorf("snapname points to different volume, request name (%s)"+
|
||||
" source name (%s) saved source name (%s)",
|
||||
reqName, parentName, savedImageAttributes.SourceName)
|
||||
return nil, ErrSnapNameConflict{reqName, err}
|
||||
return nil, util.NewErrSnapNameConflict(reqName, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -344,7 +346,7 @@ Input arguments:
|
||||
- volJournalPool: Pool name that holds the image/subvolume and the per-image journal (may be
|
||||
different if image is created in a topology constrained pool)
|
||||
*/
|
||||
func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials,
|
||||
func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *util.Credentials,
|
||||
csiJournalPool, volJournalPool, volName, reqName string) error {
|
||||
// delete volume UUID omap (first, inverse of create order)
|
||||
|
||||
@ -358,20 +360,20 @@ func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *
|
||||
return fmt.Errorf("failed parsing UUID in %s", volName)
|
||||
}
|
||||
|
||||
err := RemoveObject(ctx, monitors, cr, volJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
|
||||
err := util.RemoveObject(ctx, monitors, cr, volJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrObjectNotFound); !ok {
|
||||
klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err)
|
||||
if _, ok := err.(util.ErrObjectNotFound); !ok {
|
||||
klog.Errorf(util.Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete the request name key (last, inverse of create order)
|
||||
err := RemoveOMapKey(ctx, monitors, cr, csiJournalPool, cj.namespace, cj.csiDirectory,
|
||||
err := util.RemoveOMapKey(ctx, monitors, cr, csiJournalPool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName)
|
||||
if err != nil {
|
||||
klog.Errorf(Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err)
|
||||
klog.Errorf(util.Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -381,7 +383,7 @@ func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *
|
||||
// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated <uuid>.
|
||||
// It ensures generated omap name does not already exist and if conflicts are detected, a set
|
||||
// number of retires with newer uuids are attempted before returning an error
|
||||
func reserveOMapName(ctx context.Context, monitors string, cr *Credentials, pool, namespace, oMapNamePrefix string) (string, error) {
|
||||
func reserveOMapName(ctx context.Context, monitors string, cr *util.Credentials, pool, namespace, oMapNamePrefix string) (string, error) {
|
||||
var iterUUID string
|
||||
|
||||
maxAttempts := 5
|
||||
@ -390,12 +392,12 @@ func reserveOMapName(ctx context.Context, monitors string, cr *Credentials, pool
|
||||
// generate a uuid for the image name
|
||||
iterUUID = uuid.NewUUID().String()
|
||||
|
||||
err := CreateObject(ctx, monitors, cr, pool, namespace, oMapNamePrefix+iterUUID)
|
||||
err := util.CreateObject(ctx, monitors, cr, pool, namespace, oMapNamePrefix+iterUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrObjectExists); ok {
|
||||
if _, ok := err.(util.ErrObjectExists); ok {
|
||||
attempt++
|
||||
// try again with a different uuid, for maxAttempts tries
|
||||
klog.V(4).Infof(Log(ctx, "uuid (%s) conflict detected, retrying (attempt %d of %d)"),
|
||||
klog.V(4).Infof(util.Log(ctx, "uuid (%s) conflict detected, retrying (attempt %d of %d)"),
|
||||
iterUUID, attempt, maxAttempts)
|
||||
continue
|
||||
}
|
||||
@ -433,7 +435,7 @@ Return values:
|
||||
- string: Contains the image name that was reserved for the passed in reqName
|
||||
- error: non-nil in case of any errors
|
||||
*/
|
||||
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials,
|
||||
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *util.Credentials,
|
||||
journalPool string, journalPoolID int64,
|
||||
imagePool string, imagePoolID int64,
|
||||
reqName, namePrefix, parentName, kmsConf string) (string, string, error) {
|
||||
@ -464,7 +466,7 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
|
||||
|
||||
// Create request name (csiNameKey) key in csiDirectory and store the UUID based
|
||||
// volume name and optionally the image pool location into it
|
||||
if journalPool != imagePool && imagePoolID != InvalidPoolID {
|
||||
if journalPool != imagePool && imagePoolID != util.InvalidPoolID {
|
||||
buf64 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf64, uint64(imagePoolID))
|
||||
poolIDEncodedHex := hex.EncodeToString(buf64)
|
||||
@ -473,17 +475,17 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
|
||||
nameKeyVal = volUUID
|
||||
}
|
||||
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
|
||||
err = util.SetOMapKeyValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName, nameKeyVal)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName)
|
||||
klog.Warningf(util.Log(ctx, "reservation failed for volume: %s"), reqName)
|
||||
errDefer := cj.UndoReservation(ctx, monitors, cr, imagePool, journalPool, imageName, reqName)
|
||||
if errDefer != nil {
|
||||
klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer)
|
||||
klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -492,14 +494,14 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
|
||||
// and also CSI journal pool, when only the VolumeID is passed in (e.g DeleteVolume/DeleteSnapshot,
|
||||
// VolID during CreateSnapshot).
|
||||
// Update UUID directory to store CSI request name
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.csiNameKey, reqName)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
// Update UUID directory to store image name
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.csiImageKey, imageName)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
@ -507,20 +509,20 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
|
||||
|
||||
// Update UUID directory to store encryption values
|
||||
if kmsConf != "" {
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.encryptKMSKey, kmsConf)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
}
|
||||
|
||||
if journalPool != imagePool && journalPoolID != InvalidPoolID {
|
||||
if journalPool != imagePool && journalPoolID != util.InvalidPoolID {
|
||||
buf64 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf64, uint64(journalPoolID))
|
||||
journalPoolIDStr := hex.EncodeToString(buf64)
|
||||
|
||||
// Update UUID directory to store CSI journal pool name (prefer ID instead of name to be pool rename proof)
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.csiJournalPool, journalPoolIDStr)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
@ -529,7 +531,7 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
|
||||
|
||||
if snapSource {
|
||||
// Update UUID directory to store source volume UUID in case of snapshots
|
||||
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
err = util.SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
|
||||
cj.cephSnapSourceKey, parentName)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
@ -549,7 +551,7 @@ type ImageAttributes struct {
|
||||
}
|
||||
|
||||
// GetImageAttributes fetches all keys and their values, from a UUID directory, returning ImageAttributes structure
|
||||
func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (*ImageAttributes, error) {
|
||||
func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, cr *util.Credentials, pool, objectUUID string, snapSource bool) (*ImageAttributes, error) {
|
||||
var (
|
||||
err error
|
||||
imageAttributes *ImageAttributes = &ImageAttributes{}
|
||||
@ -561,7 +563,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c
|
||||
}
|
||||
|
||||
// TODO: fetch all omap vals in one call, than make multiple listomapvals
|
||||
imageAttributes.RequestName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
imageAttributes.RequestName, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -569,7 +571,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c
|
||||
|
||||
// image key was added at some point, so not all volumes will have this key set
|
||||
// when ceph-csi was upgraded
|
||||
imageAttributes.ImageName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
imageAttributes.ImageName, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey)
|
||||
if err != nil {
|
||||
// if the key was not found, assume the default key + UUID
|
||||
@ -577,7 +579,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c
|
||||
switch err.(type) {
|
||||
default:
|
||||
return nil, err
|
||||
case ErrKeyNotFound, ErrPoolNotFound:
|
||||
case util.ErrKeyNotFound, util.ErrPoolNotFound:
|
||||
}
|
||||
|
||||
if snapSource {
|
||||
@ -587,7 +589,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c
|
||||
}
|
||||
}
|
||||
|
||||
imageAttributes.KmsID, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
imageAttributes.KmsID, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey)
|
||||
if err != nil {
|
||||
// ErrKeyNotFound means no encryption KMS was used
|
||||
@ -595,17 +597,17 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c
|
||||
default:
|
||||
return nil, fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s",
|
||||
pool, cj.cephUUIDDirectoryPrefix+objectUUID, err)
|
||||
case ErrKeyNotFound, ErrPoolNotFound:
|
||||
case util.ErrKeyNotFound, util.ErrPoolNotFound:
|
||||
}
|
||||
}
|
||||
|
||||
journalPoolIDStr, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
journalPoolIDStr, err := util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrKeyNotFound); !ok {
|
||||
if _, ok := err.(util.ErrKeyNotFound); !ok {
|
||||
return nil, err
|
||||
}
|
||||
imageAttributes.JournalPoolID = InvalidPoolID
|
||||
imageAttributes.JournalPoolID = util.InvalidPoolID
|
||||
} else {
|
||||
var buf64 []byte
|
||||
buf64, err = hex.DecodeString(journalPoolIDStr)
|
||||
@ -616,7 +618,7 @@ func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, c
|
||||
}
|
||||
|
||||
if snapSource {
|
||||
imageAttributes.SourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
imageAttributes.SourceName, err = util.GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
|
||||
if err != nil {
|
||||
return nil, err
|
@ -18,6 +18,7 @@ package rbd
|
||||
|
||||
import (
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
"github.com/ceph/ceph-csi/internal/journal"
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
@ -50,8 +51,8 @@ var (
|
||||
|
||||
// volJournal and snapJournal are used to maintain RADOS based journals for CO generated
|
||||
// VolumeName to backing RBD images
|
||||
volJournal *util.CSIJournal
|
||||
snapJournal *util.CSIJournal
|
||||
volJournal *journal.CSIJournal
|
||||
snapJournal *journal.CSIJournal
|
||||
)
|
||||
|
||||
// NewDriver returns new rbd driver
|
||||
@ -103,8 +104,8 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
}
|
||||
|
||||
// Get an instance of the volume and snapshot journal keys
|
||||
volJournal = util.NewCSIVolumeJournal()
|
||||
snapJournal = util.NewCSISnapshotJournal()
|
||||
volJournal = journal.NewCSIVolumeJournal()
|
||||
snapJournal = journal.NewCSISnapshotJournal()
|
||||
|
||||
// Update keys with CSI instance suffix
|
||||
volJournal.SetCSIDirectorySuffix(CSIInstanceID)
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"strings"
|
||||
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
"github.com/ceph/ceph-csi/internal/journal"
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
@ -145,7 +146,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
}
|
||||
default:
|
||||
var vi util.CSIIdentifier
|
||||
var imageAttributes *util.ImageAttributes
|
||||
var imageAttributes *journal.ImageAttributes
|
||||
err = vi.DecomposeCSIID(volID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID)
|
||||
|
Loading…
Reference in New Issue
Block a user