rbd: introduce rbdImage as base for rbdVolume and rbdSnapshot

Because rbdVolume and rbdSnapshot are very similar, they can be based
off a common struct rbdImage that contains the common attributes and
functions.

This makes it possible to re-use functions for snapshots, and prevents
further duplication or code.

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2021-03-12 13:37:15 +01:00 committed by mergify[bot]
parent 3fe714c4fa
commit 10a75dd4ff
4 changed files with 171 additions and 167 deletions

View File

@ -46,10 +46,10 @@ import (
func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) (bool, error) {
// generate temp cloned volume
tempClone := rv.generateTempClone()
snap := &rbdSnapshot{
RbdSnapName: rv.RbdImageName,
Pool: rv.Pool,
}
snap := &rbdSnapshot{}
snap.RbdSnapName = rv.RbdImageName
snap.Pool = rv.Pool
// check if cloned image exists
err := rv.getImageInfo()
if err == nil {
@ -145,18 +145,17 @@ func (rv *rbdVolume) generateTempClone() *rbdVolume {
func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVolume) error {
// generate temp cloned volume
tempClone := rv.generateTempClone()
tempSnap := &rbdSnapshot{
// snapshot name is same as temporary cloned image, This helps to
// flatten the temporary cloned images as we cannot have more than 510
// snapshots on an rbd image
RbdSnapName: tempClone.RbdImageName,
Pool: rv.Pool,
}
tempSnap := &rbdSnapshot{}
tempSnap.RbdSnapName = tempClone.RbdImageName
tempSnap.Pool = rv.Pool
cloneSnap := &rbdSnapshot{}
cloneSnap.RbdSnapName = rv.RbdImageName
cloneSnap.Pool = rv.Pool
cloneSnap := &rbdSnapshot{
RbdSnapName: rv.RbdImageName,
Pool: rv.Pool,
}
var (
errClone error
errFlatten error

View File

@ -61,71 +61,71 @@ const (
)
// checkRbdImageEncrypted verifies if rbd image was encrypted when created.
func (rv *rbdVolume) checkRbdImageEncrypted(ctx context.Context) (rbdEncryptionState, error) {
value, err := rv.GetMetadata(encryptionMetaKey)
func (ri *rbdImage) checkRbdImageEncrypted(ctx context.Context) (rbdEncryptionState, error) {
value, err := ri.GetMetadata(encryptionMetaKey)
if errors.Is(err, librbd.ErrNotFound) {
util.DebugLog(ctx, "image %s encrypted state not set", rv.String())
util.DebugLog(ctx, "image %s encrypted state not set", ri.String())
return rbdImageEncryptionUnknown, nil
} else if err != nil {
util.ErrorLog(ctx, "checking image %s encrypted state metadata failed: %s", rv.String(), err)
util.ErrorLog(ctx, "checking image %s encrypted state metadata failed: %s", ri.String(), err)
return rbdImageEncryptionUnknown, err
}
encrypted := rbdEncryptionState(strings.TrimSpace(value))
util.DebugLog(ctx, "image %s encrypted state metadata reports %q", rv.String(), encrypted)
util.DebugLog(ctx, "image %s encrypted state metadata reports %q", ri.String(), encrypted)
return encrypted, nil
}
func (rv *rbdVolume) ensureEncryptionMetadataSet(status rbdEncryptionState) error {
err := rv.SetMetadata(encryptionMetaKey, string(status))
func (ri *rbdImage) ensureEncryptionMetadataSet(status rbdEncryptionState) error {
err := ri.SetMetadata(encryptionMetaKey, string(status))
if err != nil {
return fmt.Errorf("failed to save encryption status for %s: %w", rv, err)
return fmt.Errorf("failed to save encryption status for %s: %w", ri, err)
}
return nil
}
// isEncrypted returns `true` if the rbdVolume is (or needs to be) encrypted.
func (rv *rbdVolume) isEncrypted() bool {
return rv.encryption != nil
// isEncrypted returns `true` if the rbdImage is (or needs to be) encrypted.
func (ri *rbdImage) isEncrypted() bool {
return ri.encryption != nil
}
// setupEncryption configures the metadata of the RBD image for encryption:
// - the Data-Encryption-Key (DEK) will be generated stored for use by the KMS;
// - the RBD image will be marked to support encryption in its metadata.
func (rv *rbdVolume) setupEncryption(ctx context.Context) error {
err := rv.encryption.StoreNewCryptoPassphrase(rv.VolID)
func (ri *rbdImage) setupEncryption(ctx context.Context) error {
err := ri.encryption.StoreNewCryptoPassphrase(ri.VolID)
if err != nil {
util.ErrorLog(ctx, "failed to save encryption passphrase for "+
"image %s: %s", rv.String(), err)
"image %s: %s", ri.String(), err)
return err
}
err = rv.ensureEncryptionMetadataSet(rbdImageEncryptionPrepared)
err = ri.ensureEncryptionMetadataSet(rbdImageEncryptionPrepared)
if err != nil {
util.ErrorLog(ctx, "failed to save encryption status, deleting "+
"image %s: %s", rv.String(), err)
"image %s: %s", ri.String(), err)
return err
}
return nil
}
func (rv *rbdVolume) encryptDevice(ctx context.Context, devicePath string) error {
passphrase, err := rv.encryption.GetCryptoPassphrase(rv.VolID)
func (ri *rbdImage) encryptDevice(ctx context.Context, devicePath string) error {
passphrase, err := ri.encryption.GetCryptoPassphrase(ri.VolID)
if err != nil {
util.ErrorLog(ctx, "failed to get crypto passphrase for %s: %v",
rv.String(), err)
ri.String(), err)
return err
}
if err = util.EncryptVolume(ctx, devicePath, passphrase); err != nil {
err = fmt.Errorf("failed to encrypt volume %s: %w", rv.String(), err)
err = fmt.Errorf("failed to encrypt volume %s: %w", ri.String(), err)
util.ErrorLog(ctx, err.Error())
return err
}
err = rv.ensureEncryptionMetadataSet(rbdImageEncrypted)
err = ri.ensureEncryptionMetadataSet(rbdImageEncrypted)
if err != nil {
util.ErrorLog(ctx, err.Error())
return err
@ -163,7 +163,7 @@ func (rv *rbdVolume) openEncryptedDevice(ctx context.Context, devicePath string)
return mapperFilePath, nil
}
func (rv *rbdVolume) initKMS(ctx context.Context, volOptions, credentials map[string]string) error {
func (ri *rbdImage) initKMS(ctx context.Context, volOptions, credentials map[string]string) error {
var (
err error
ok bool
@ -174,9 +174,9 @@ func (rv *rbdVolume) initKMS(ctx context.Context, volOptions, credentials map[st
// depending on the tenant, the KMS can be configured with other
// options
// FIXME: this works only on Kubernetes, how do other CO supply metadata?
rv.Owner, ok = volOptions["csi.storage.k8s.io/pvc/namespace"]
ri.Owner, ok = volOptions["csi.storage.k8s.io/pvc/namespace"]
if !ok {
util.DebugLog(ctx, "could not detect owner for %s", rv.String())
util.DebugLog(ctx, "could not detect owner for %s", ri.String())
}
encrypted, ok = volOptions["encrypted"]
@ -192,7 +192,7 @@ func (rv *rbdVolume) initKMS(ctx context.Context, volOptions, credentials map[st
return nil
}
err = rv.configureEncryption(volOptions["encryptionKMSID"], credentials)
err = ri.configureEncryption(volOptions["encryptionKMSID"], credentials)
if err != nil {
return fmt.Errorf("invalid encryption kms configuration: %w", err)
}
@ -200,48 +200,48 @@ func (rv *rbdVolume) initKMS(ctx context.Context, volOptions, credentials map[st
return nil
}
// configureEncryption sets up the VolumeEncryption for this rbdVolume. Once
// configureEncryption sets up the VolumeEncryption for this rbdImage. Once
// configured, use isEncrypted() to see if the volume supports encryption.
func (rv *rbdVolume) configureEncryption(kmsID string, credentials map[string]string) error {
kms, err := util.GetKMS(rv.Owner, kmsID, credentials)
func (ri *rbdImage) configureEncryption(kmsID string, credentials map[string]string) error {
kms, err := util.GetKMS(ri.Owner, kmsID, credentials)
if err != nil {
return err
}
rv.encryption, err = util.NewVolumeEncryption(kms)
ri.encryption, err = util.NewVolumeEncryption(kms)
// if the KMS can not store the DEK itself, we'll store it in the
// metadata of the RBD image itself
if errors.Is(err, util.ErrDEKStoreNeeded) {
rv.encryption.SetDEKStore(rv)
ri.encryption.SetDEKStore(ri)
}
return nil
}
// StoreDEK saves the DEK in the metadata, overwrites any existing contents.
func (rv *rbdVolume) StoreDEK(volumeID, dek string) error {
if rv.VolID != volumeID {
return fmt.Errorf("volume %q can not store DEK for %q", rv.String(), volumeID)
func (ri *rbdImage) StoreDEK(volumeID, dek string) error {
if ri.VolID != volumeID {
return fmt.Errorf("volume %q can not store DEK for %q", ri.String(), volumeID)
}
return rv.SetMetadata(metadataDEK, dek)
return ri.SetMetadata(metadataDEK, dek)
}
// FetchDEK reads the DEK from the image metadata.
func (rv *rbdVolume) FetchDEK(volumeID string) (string, error) {
if rv.VolID != volumeID {
return "", fmt.Errorf("volume %q can not fetch DEK for %q", rv.String(), volumeID)
func (ri *rbdImage) FetchDEK(volumeID string) (string, error) {
if ri.VolID != volumeID {
return "", fmt.Errorf("volume %q can not fetch DEK for %q", ri.String(), volumeID)
}
return rv.GetMetadata(metadataDEK)
return ri.GetMetadata(metadataDEK)
}
// RemoveDEK does not need to remove the DEK from the metadata, the image is
// most likely getting removed.
func (rv *rbdVolume) RemoveDEK(volumeID string) error {
if rv.VolID != volumeID {
return fmt.Errorf("volume %q can not remove DEK for %q", rv.String(), volumeID)
func (ri *rbdImage) RemoveDEK(volumeID string) error {
if ri.VolID != volumeID {
return fmt.Errorf("volume %q can not remove DEK for %q", ri.String(), volumeID)
}
return nil

View File

@ -491,7 +491,8 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin
)
options = make(map[string]string)
rbdVol = &rbdVolume{VolID: volumeID}
rbdVol = &rbdVolume{}
rbdVol.VolID = volumeID
err := vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {

View File

@ -58,39 +58,66 @@ const (
thickProvisionMetaKey = ".rbd.csi.ceph.com/thick-provisioned"
)
// rbdImage contains common attributes and methods for the rbdVolume and
// rbdSnapshot types.
type rbdImage struct {
// RbdImageName is the name of the RBD image backing this rbdVolume.
// This does not have a JSON tag as it is not stashed in JSON encoded
// config maps in v1.0.0
RbdImageName string
// ImageID contains the image id of the image
ImageID string
// VolID is the volume ID that is exchanged with CSI drivers,
// identifying this rbd image
VolID string `json:"volID"`
Monitors string
// JournalPool is the ceph pool in which the CSI snapshot Journal is
// stored
JournalPool string
// Pool is where the image snapshot journal and snapshot is stored, and
// could be the same as `JournalPool` (retained as Pool instead of
// renaming to ImagePool or such, as this is referenced in the code
// extensively)
Pool string
RadosNamespace string
ClusterID string `json:"clusterId"`
// RequestName is the CSI generated volume name for the rbdVolume.
// This does not have a JSON tag as it is not stashed in JSON encoded
// config maps in v1.0.0
RequestName string
NamePrefix string
// encryption provides access to optional VolumeEncryption functions
encryption *util.VolumeEncryption
// Owner is the creator (tenant, Kubernetes Namespace) of the volume
Owner string
CreatedAt *timestamp.Timestamp
// conn is a connection to the Ceph cluster obtained from a ConnPool
conn *util.ClusterConnection
// an opened IOContext, call .openIoctx() before using
ioctx *rados.IOContext
}
// rbdVolume represents a CSI volume and its RBD image specifics.
type rbdVolume struct {
// RbdImageName is the name of the RBD image backing this rbdVolume. This does not have a
// JSON tag as it is not stashed in JSON encoded config maps in v1.0.0
// VolID is the volume ID that is exchanged with CSI drivers, identifying this rbdVol
// RequestName is the CSI generated volume name for the rbdVolume. This does not have a
// JSON tag as it is not stashed in JSON encoded config maps in v1.0.0
rbdImage
// VolName and MonValueFromSecret are retained from older plugin versions (<= 1.0.0)
// for backward compatibility reasons
// JournalPool is the ceph pool in which the CSI Journal is stored
// Pool is where the image journal and image is stored, and could be the same as `JournalPool`
// (retained as Pool instead of renaming to ImagePool or such, as this is referenced in the code extensively)
// DataPool is where the data for images in `Pool` are stored, this is used as the `--data-pool`
// argument when the pool is created, and is not used anywhere else
TopologyPools *[]util.TopologyConstrainedPool
TopologyRequirement *csi.TopologyRequirement
Topology map[string]string
RbdImageName string
NamePrefix string
VolID string `json:"volID"`
Monitors string `json:"monitors"`
JournalPool string
Pool string `json:"pool"`
// DataPool is where the data for images in `Pool` are stored, this is used as the `--data-pool`
// argument when the pool is created, and is not used anywhere else
DataPool string
RadosNamespace string
ImageID string
ParentName string
imageFeatureSet librbd.FeatureSet
AdminID string `json:"adminId"`
UserID string `json:"userId"`
Mounter string `json:"mounter"`
ClusterID string `json:"clusterId"`
RequestName string
ReservedID string
MapOptions string
UnmapOptions string
@ -101,41 +128,18 @@ type rbdVolume struct {
readOnly bool
Primary bool
ThickProvision bool
encryption *util.VolumeEncryption
// Owner is the creator (tenant, Kubernetes Namespace) of the volume.
Owner string
CreatedAt *timestamp.Timestamp
// conn is a connection to the Ceph cluster obtained from a ConnPool
conn *util.ClusterConnection
// an opened IOContext, call .openIoctx() before using
ioctx *rados.IOContext
}
// rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics.
type rbdSnapshot struct {
rbdImage
// SourceVolumeID is the volume ID of RbdImageName, that is exchanged with CSI drivers
// RbdImageName is the name of the RBD image, that is this rbdSnapshot's source image
// RbdSnapName is the name of the RBD snapshot backing this rbdSnapshot
// VolID is the snapshot ID that is exchanged with CSI drivers, identifying this rbdSnapshot
// RequestName is the CSI generated snapshot name for the rbdSnapshot
// JournalPool is the ceph pool in which the CSI snapshot Journal is stored
// Pool is where the image snapshot journal and snapshot is stored, and could be the same as `JournalPool`
// ImageID contains the image id of cloned image
SourceVolumeID string
RbdImageName string
ReservedID string
NamePrefix string
RbdSnapName string
VolID string
ImageID string
Monitors string
JournalPool string
Pool string
RadosNamespace string
CreatedAt *timestamp.Timestamp
SizeBytes int64
ClusterID string
RequestName string
}
var (
@ -143,40 +147,40 @@ var (
)
// Connect an rbdVolume to the Ceph cluster.
func (rv *rbdVolume) Connect(cr *util.Credentials) error {
if rv.conn != nil {
func (ri *rbdImage) Connect(cr *util.Credentials) error {
if ri.conn != nil {
return nil
}
conn := &util.ClusterConnection{}
if err := conn.Connect(rv.Monitors, cr); err != nil {
if err := conn.Connect(ri.Monitors, cr); err != nil {
return err
}
rv.conn = conn
ri.conn = conn
return nil
}
// Destroy cleans up the rbdVolume and closes the connection to the Ceph
// cluster in case one was setup.
func (rv *rbdVolume) Destroy() {
if rv.ioctx != nil {
rv.ioctx.Destroy()
func (ri *rbdImage) Destroy() {
if ri.ioctx != nil {
ri.ioctx.Destroy()
}
if rv.conn != nil {
rv.conn.Destroy()
if ri.conn != nil {
ri.conn.Destroy()
}
if rv.isEncrypted() {
rv.encryption.Destroy()
if ri.isEncrypted() {
ri.encryption.Destroy()
}
}
// String returns the image-spec (pool/{namespace/}image) format of the image.
func (rv *rbdVolume) String() string {
if rv.RadosNamespace != "" {
return fmt.Sprintf("%s/%s/%s", rv.Pool, rv.RadosNamespace, rv.RbdImageName)
func (ri *rbdImage) String() string {
if ri.RadosNamespace != "" {
return fmt.Sprintf("%s/%s/%s", ri.Pool, ri.RadosNamespace, ri.RbdImageName)
}
return fmt.Sprintf("%s/%s", rv.Pool, rv.RbdImageName)
return fmt.Sprintf("%s/%s", ri.Pool, ri.RbdImageName)
}
// String returns the snap-spec (pool/{namespace/}image@snap) format of the snapshot.
@ -247,19 +251,19 @@ func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) er
return nil
}
func (rv *rbdVolume) openIoctx() error {
if rv.ioctx != nil {
func (ri *rbdImage) openIoctx() error {
if ri.ioctx != nil {
return nil
}
ioctx, err := rv.conn.GetIoctx(rv.Pool)
ioctx, err := ri.conn.GetIoctx(ri.Pool)
if err != nil {
// GetIoctx() can return util.ErrPoolNotFound
return err
}
ioctx.SetNamespace(rv.RadosNamespace)
rv.ioctx = ioctx
ioctx.SetNamespace(ri.RadosNamespace)
ri.ioctx = ioctx
return nil
}
@ -284,17 +288,17 @@ func (rv *rbdVolume) getImageID() error {
return nil
}
// open the rbdVolume after it has been connected.
// open the rbdImage after it has been connected.
// ErrPoolNotFound or ErrImageNotFound are returned in case the pool or image
// can not be found, other errors will contain more details about other issues
// (permission denied, ...) and are expected to relate to configuration issues.
func (rv *rbdVolume) open() (*librbd.Image, error) {
err := rv.openIoctx()
func (ri *rbdImage) open() (*librbd.Image, error) {
err := ri.openIoctx()
if err != nil {
return nil, err
}
image, err := librbd.OpenImage(rv.ioctx, rv.RbdImageName, librbd.NoSnapshot)
image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
err = util.JoinErrors(ErrImageNotFound, err)
@ -488,12 +492,11 @@ func deleteImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) er
func (rv *rbdVolume) getCloneDepth(ctx context.Context) (uint, error) {
var depth uint
vol := rbdVolume{
Pool: rv.Pool,
Monitors: rv.Monitors,
RbdImageName: rv.RbdImageName,
conn: rv.conn,
}
vol := rbdVolume{}
vol.Pool = rv.Pool
vol.Monitors = rv.Monitors
vol.RbdImageName = rv.RbdImageName
vol.conn = rv.conn
err := vol.openIoctx()
if err != nil {
@ -530,11 +533,11 @@ type trashSnapInfo struct {
}
func flattenClonedRbdImages(ctx context.Context, snaps []librbd.SnapInfo, pool, monitors, rbdImageName string, cr *util.Credentials) error {
rv := &rbdVolume{
Monitors: monitors,
Pool: pool,
RbdImageName: rbdImageName,
}
rv := &rbdVolume{}
rv.Monitors = monitors
rv.Pool = pool
rv.RbdImageName = rbdImageName
defer rv.Destroy()
err := rv.Connect(cr)
if err != nil {
@ -658,13 +661,13 @@ func (rv *rbdVolume) hasFeature(feature uint64) bool {
}
func (rv *rbdVolume) checkImageChainHasFeature(ctx context.Context, feature uint64) (bool, error) {
vol := rbdVolume{
Pool: rv.Pool,
RadosNamespace: rv.RadosNamespace,
Monitors: rv.Monitors,
RbdImageName: rv.RbdImageName,
conn: rv.conn,
}
vol := rbdVolume{}
vol.Pool = rv.Pool
vol.RadosNamespace = rv.RadosNamespace
vol.Monitors = rv.Monitors
vol.RbdImageName = rv.RbdImageName
vol.conn = rv.conn
err := vol.openIoctx()
if err != nil {
return false, err
@ -765,7 +768,8 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
// rbdVolume fields that are not filled up in this function are:
// Mounter, MultiNodeWritable
rbdVol = &rbdVolume{VolID: volumeID}
rbdVol = &rbdVolume{}
rbdVol.VolID = volumeID
err = vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
@ -1269,8 +1273,8 @@ func (rv *rbdVolume) resize(newSize int64) error {
return nil
}
func (rv *rbdVolume) GetMetadata(key string) (string, error) {
image, err := rv.open()
func (ri *rbdImage) GetMetadata(key string) (string, error) {
image, err := ri.open()
if err != nil {
return "", err
}
@ -1279,8 +1283,8 @@ func (rv *rbdVolume) GetMetadata(key string) (string, error) {
return image.GetMetadata(key)
}
func (rv *rbdVolume) SetMetadata(key, value string) error {
image, err := rv.open()
func (ri *rbdImage) SetMetadata(key, value string) error {
image, err := ri.open()
if err != nil {
return err
}